fix: queue client && inmemory repository

This commit is contained in:
2024-03-11 23:15:10 +07:00
parent 198452d357
commit d72d069309
3 changed files with 51 additions and 35 deletions

View File

@@ -1,4 +1,3 @@
using System.Collections.Concurrent;
using System.Data; using System.Data;
using InServiceQue.Core.Models; using InServiceQue.Core.Models;
using InServiceQue.Core.Repositories; using InServiceQue.Core.Repositories;
@@ -7,9 +6,9 @@ namespace InServiceQue.InMemory;
public class TaskRepositoryInMemory: ITaskRepository public class TaskRepositoryInMemory: ITaskRepository
{ {
//todo: concurrent access here private static object _lockObject = new();
private static int _currentShard; private static int _currentShard;
private static ConcurrentDictionary<string, ConcurrentQueue<QueueTask>> _quesByType = new(); private static Dictionary<string, Queue<QueueTask>> _quesByType = new();
public void Dispose() public void Dispose()
{ {
// TODO release managed resources here // TODO release managed resources here
@@ -17,22 +16,22 @@ public class TaskRepositoryInMemory: ITaskRepository
public void Insert(QueueTask task) public void Insert(QueueTask task)
{ {
if (_quesByType.ContainsKey(task.TaskType)) lock (_lockObject)
{ {
_quesByType.AddOrUpdate(task.TaskType, if (_quesByType.ContainsKey(task.TaskType))
(t) => new ConcurrentQueue<QueueTask>(new[] { task }), {
(t, queue) => _quesByType[task.TaskType].Enqueue(task);
{ }
queue.Enqueue(task); else
return queue; {
} _quesByType.Add(task.TaskType, new Queue<QueueTask>(new []{task}));
); }
} }
} }
public async Task InsertAsync(QueueTask task) public async Task InsertAsync(QueueTask task)
{ {
Insert(task); Insert(task);
await Task.CompletedTask; await Task.CompletedTask;
} }
@@ -84,26 +83,27 @@ public class TaskRepositoryInMemory: ITaskRepository
public QueueTask? GetNextTaskRolling() public QueueTask? GetNextTaskRolling()
{ {
QueueTask? task = null; QueueTask? task = null;
var topicsCount = _quesByType.Count; lock (_lockObject)
for (int i = 0; i < topicsCount; i++)
{ {
//todo: concurrent access var topicsCount = _quesByType.Count;
if (i == _currentShard) for (int i = 0; i < topicsCount; i++)
{ {
var keyValuePair = _quesByType.ToArray()[i]; if (i == _currentShard)
var que = keyValuePair.Value;
que.TryDequeue(out task);
if (que.IsEmpty)
{ {
_quesByType.TryRemove(keyValuePair); var keyValuePair = _quesByType.ToArray()[i];
} var que = keyValuePair.Value;
task = que.Dequeue();
break; if (!que.Any())
{
_quesByType.Remove(keyValuePair.Key);
}
break;
}
} }
_currentShard = _currentShard == topicsCount - 1 ? 0 : _currentShard + 1;
} }
//todo: concurrent access
_currentShard = _currentShard == topicsCount - 1 ? 0 : _currentShard + 1;
return task; return task;
} }
@@ -116,11 +116,15 @@ public class TaskRepositoryInMemory: ITaskRepository
return await Task.FromResult(GetNextTaskRolling()); return await Task.FromResult(GetNextTaskRolling());
QueueTask? task = null; QueueTask? task = null;
if (_quesByType.TryGetValue(taskType, out var que)) lock (_lockObject)
{ {
que.TryDequeue(out task); if (_quesByType.TryGetValue(taskType, out var que))
{
que.TryDequeue(out task);
}
} }
return await Task.FromResult(task); return await Task.FromResult(task);
} }

View File

@@ -4,6 +4,7 @@ using InServiceQue.Core.Repositories;
using InServiceQue.InMemory; using InServiceQue.InMemory;
using InServiceQue.Sample; using InServiceQue.Sample;
using InServiceQue.Services; using InServiceQue.Services;
using Microsoft.AspNetCore.Http.HttpResults;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
var builder = WebApplication.CreateBuilder(args); var builder = WebApplication.CreateBuilder(args);
@@ -16,12 +17,18 @@ var app = builder.Build();
app.MapGet("/", async (string msg) => app.MapGet("/", async (string msg) =>
{ {
var taskRepository = app.Services.GetService<IQueueClient>(); if (!string.IsNullOrEmpty(msg))
await taskRepository.AddTaskAsync(new QueueTask(new SendMessageTask(new SendMessagePayload()
{ {
From = "John", To = "Esther", Message = msg var taskRepository = app.Services.GetService<IQueueClient>();
}))); await taskRepository.AddTaskAsync(new SendMessageTask(new SendMessagePayload()
await taskRepository.AddTaskAsync(new QueueTask(new OtherMessageTask(msg))); {
From = "John", To = "Esther", Message = msg
}));
await taskRepository.AddTaskAsync(new OtherMessageTask(msg));
return new OkResult();
}
return new OkResult(); return new OkResult();
}); });

View File

@@ -7,6 +7,11 @@ public class QueueClient: IQueueClient
{ {
private readonly ITaskRepository _repository; private readonly ITaskRepository _repository;
public QueueClient(ITaskRepository repository)
{
_repository = repository;
}
public void AddTask(IQueueTask task) public void AddTask(IQueueTask task)
{ {
var queueTask = new QueueTask(task); var queueTask = new QueueTask(task);