diff --git a/InServiceQue.InMemory/TaskRepositoryInMemory.cs b/InServiceQue.InMemory/TaskRepositoryInMemory.cs index e208dd4..0eeb2ce 100644 --- a/InServiceQue.InMemory/TaskRepositoryInMemory.cs +++ b/InServiceQue.InMemory/TaskRepositoryInMemory.cs @@ -1,4 +1,3 @@ -using System.Collections.Concurrent; using System.Data; using InServiceQue.Core.Models; using InServiceQue.Core.Repositories; @@ -7,9 +6,9 @@ namespace InServiceQue.InMemory; public class TaskRepositoryInMemory: ITaskRepository { - //todo: concurrent access here + private static object _lockObject = new(); private static int _currentShard; - private static ConcurrentDictionary> _quesByType = new(); + private static Dictionary> _quesByType = new(); public void Dispose() { // TODO release managed resources here @@ -17,22 +16,22 @@ public class TaskRepositoryInMemory: ITaskRepository public void Insert(QueueTask task) { - if (_quesByType.ContainsKey(task.TaskType)) + lock (_lockObject) { - _quesByType.AddOrUpdate(task.TaskType, - (t) => new ConcurrentQueue(new[] { task }), - (t, queue) => - { - queue.Enqueue(task); - return queue; - } - ); + if (_quesByType.ContainsKey(task.TaskType)) + { + _quesByType[task.TaskType].Enqueue(task); + } + else + { + _quesByType.Add(task.TaskType, new Queue(new []{task})); + } + } } public async Task InsertAsync(QueueTask task) { - Insert(task); await Task.CompletedTask; } @@ -84,26 +83,27 @@ public class TaskRepositoryInMemory: ITaskRepository public QueueTask? GetNextTaskRolling() { QueueTask? task = null; - var topicsCount = _quesByType.Count; - for (int i = 0; i < topicsCount; i++) + lock (_lockObject) { - //todo: concurrent access - if (i == _currentShard) + var topicsCount = _quesByType.Count; + for (int i = 0; i < topicsCount; i++) { - var keyValuePair = _quesByType.ToArray()[i]; - var que = keyValuePair.Value; - que.TryDequeue(out task); - - if (que.IsEmpty) + if (i == _currentShard) { - _quesByType.TryRemove(keyValuePair); - } + var keyValuePair = _quesByType.ToArray()[i]; + var que = keyValuePair.Value; + task = que.Dequeue(); - break; + if (!que.Any()) + { + _quesByType.Remove(keyValuePair.Key); + } + break; + } } + + _currentShard = _currentShard == topicsCount - 1 ? 0 : _currentShard + 1; } - //todo: concurrent access - _currentShard = _currentShard == topicsCount - 1 ? 0 : _currentShard + 1; return task; } @@ -116,11 +116,15 @@ public class TaskRepositoryInMemory: ITaskRepository return await Task.FromResult(GetNextTaskRolling()); QueueTask? task = null; - if (_quesByType.TryGetValue(taskType, out var que)) + lock (_lockObject) { - que.TryDequeue(out task); + if (_quesByType.TryGetValue(taskType, out var que)) + { + que.TryDequeue(out task); + } } + return await Task.FromResult(task); } diff --git a/InServiceQue.Sample/Program.cs b/InServiceQue.Sample/Program.cs index 4a688e4..a4b74bd 100644 --- a/InServiceQue.Sample/Program.cs +++ b/InServiceQue.Sample/Program.cs @@ -4,6 +4,7 @@ using InServiceQue.Core.Repositories; using InServiceQue.InMemory; using InServiceQue.Sample; using InServiceQue.Services; +using Microsoft.AspNetCore.Http.HttpResults; using Microsoft.AspNetCore.Mvc; var builder = WebApplication.CreateBuilder(args); @@ -16,12 +17,18 @@ var app = builder.Build(); app.MapGet("/", async (string msg) => { - var taskRepository = app.Services.GetService(); - await taskRepository.AddTaskAsync(new QueueTask(new SendMessageTask(new SendMessagePayload() + if (!string.IsNullOrEmpty(msg)) { - From = "John", To = "Esther", Message = msg - }))); - await taskRepository.AddTaskAsync(new QueueTask(new OtherMessageTask(msg))); + var taskRepository = app.Services.GetService(); + await taskRepository.AddTaskAsync(new SendMessageTask(new SendMessagePayload() + { + From = "John", To = "Esther", Message = msg + })); + await taskRepository.AddTaskAsync(new OtherMessageTask(msg)); + + return new OkResult(); + } + return new OkResult(); }); diff --git a/InServiceQue/Services/QueueClient.cs b/InServiceQue/Services/QueueClient.cs index cc40e59..de4372c 100644 --- a/InServiceQue/Services/QueueClient.cs +++ b/InServiceQue/Services/QueueClient.cs @@ -6,7 +6,12 @@ namespace InServiceQue.Services; public class QueueClient: IQueueClient { private readonly ITaskRepository _repository; - + + public QueueClient(ITaskRepository repository) + { + _repository = repository; + } + public void AddTask(IQueueTask task) { var queueTask = new QueueTask(task);