From ae1175cf516ffe1099b0fbbabfee65db35381fc9 Mon Sep 17 00:00:00 2001 From: HiveBeats Date: Mon, 11 Mar 2024 22:51:23 +0700 Subject: [PATCH] feat: explicit queue client, resolve handler avoiding type registry --- .../Repositories/ITaskRepository.cs | 4 +- .../TaskRepositoryInMemory.cs | 68 +++++++++++++++++-- InServiceQue.Postgres/TaskRepository.cs | 4 +- InServiceQue.Sample/Program.cs | 7 +- InServiceQue/InServiceQueBuilder.cs | 1 + InServiceQue/Services/IQueueClient.cs | 9 +++ InServiceQue/Services/IQueueService.cs | 2 - InServiceQue/Services/QueueClient.cs | 21 ++++++ InServiceQue/Services/QueueService.cs | 24 +++---- InServiceQue/Services/QueueTypeRegistry.cs | 1 + 10 files changed, 110 insertions(+), 31 deletions(-) create mode 100644 InServiceQue/Services/IQueueClient.cs create mode 100644 InServiceQue/Services/QueueClient.cs diff --git a/InServiceQue.Core/Repositories/ITaskRepository.cs b/InServiceQue.Core/Repositories/ITaskRepository.cs index dc066be..ed68b49 100644 --- a/InServiceQue.Core/Repositories/ITaskRepository.cs +++ b/InServiceQue.Core/Repositories/ITaskRepository.cs @@ -13,8 +13,8 @@ public interface ITaskRepository: IDisposable Task CommitTransactionAsync(IDbTransaction tx); void RollbackTransaction(IDbTransaction tx); Task RollbackTransactionAsync(IDbTransaction tx); - QueueTask? GetNextTask(IDbTransaction tx); - Task GetNextTaskAsync(IDbTransaction tx); + QueueTask? GetNextTask(IDbTransaction tx, string? taskType = null); + Task GetNextTaskAsync(IDbTransaction tx, string? taskType = null); void SaveTask(QueueTask task, IDbTransaction tx); Task SaveTaskAsync(QueueTask task, IDbTransaction tx); } \ No newline at end of file diff --git a/InServiceQue.InMemory/TaskRepositoryInMemory.cs b/InServiceQue.InMemory/TaskRepositoryInMemory.cs index 5f97efd..e208dd4 100644 --- a/InServiceQue.InMemory/TaskRepositoryInMemory.cs +++ b/InServiceQue.InMemory/TaskRepositoryInMemory.cs @@ -7,7 +7,9 @@ namespace InServiceQue.InMemory; public class TaskRepositoryInMemory: ITaskRepository { - private static ConcurrentQueue _que = new(); + //todo: concurrent access here + private static int _currentShard; + private static ConcurrentDictionary> _quesByType = new(); public void Dispose() { // TODO release managed resources here @@ -15,11 +17,22 @@ public class TaskRepositoryInMemory: ITaskRepository public void Insert(QueueTask task) { - _que.Enqueue(task); + if (_quesByType.ContainsKey(task.TaskType)) + { + _quesByType.AddOrUpdate(task.TaskType, + (t) => new ConcurrentQueue(new[] { task }), + (t, queue) => + { + queue.Enqueue(task); + return queue; + } + ); + } } public async Task InsertAsync(QueueTask task) { + Insert(task); await Task.CompletedTask; } @@ -54,19 +67,60 @@ public class TaskRepositoryInMemory: ITaskRepository await Task.CompletedTask; } - public QueueTask? GetNextTask(IDbTransaction tx) + public QueueTask? GetNextTask(IDbTransaction tx, string? taskType = null) { - QueueTask? task; - _que.TryDequeue(out task); + if (taskType == null) + return GetNextTaskRolling(); + + QueueTask? task = null; + if (_quesByType.TryGetValue(taskType, out var que)) + { + que.TryDequeue(out task); + } return task; } - public async Task GetNextTaskAsync(IDbTransaction tx) + public QueueTask? GetNextTaskRolling() { QueueTask? task = null; - _que.TryDequeue(out task); + var topicsCount = _quesByType.Count; + for (int i = 0; i < topicsCount; i++) + { + //todo: concurrent access + if (i == _currentShard) + { + var keyValuePair = _quesByType.ToArray()[i]; + var que = keyValuePair.Value; + que.TryDequeue(out task); + if (que.IsEmpty) + { + _quesByType.TryRemove(keyValuePair); + } + + break; + } + } + //todo: concurrent access + _currentShard = _currentShard == topicsCount - 1 ? 0 : _currentShard + 1; + + return task; + } + + + + public async Task GetNextTaskAsync(IDbTransaction tx, string? taskType = null) + { + if (taskType == null) + return await Task.FromResult(GetNextTaskRolling()); + + QueueTask? task = null; + if (_quesByType.TryGetValue(taskType, out var que)) + { + que.TryDequeue(out task); + } + return await Task.FromResult(task); } diff --git a/InServiceQue.Postgres/TaskRepository.cs b/InServiceQue.Postgres/TaskRepository.cs index fe6d682..6320382 100644 --- a/InServiceQue.Postgres/TaskRepository.cs +++ b/InServiceQue.Postgres/TaskRepository.cs @@ -61,7 +61,7 @@ public class TaskRepository: ITaskRepository await ((NpgsqlTransaction)tx).RollbackAsync(); } - public QueueTask? GetNextTask(IDbTransaction tx) + public QueueTask? GetNextTask(IDbTransaction tx, string? taskType = null) { var sql = $@"select * from QUEUE where order by {nameof(QueueTask.DateCreated)} @@ -71,7 +71,7 @@ public class TaskRepository: ITaskRepository return _connection.QuerySingleOrDefault(sql, tx); } - public async Task GetNextTaskAsync(IDbTransaction tx) + public async Task GetNextTaskAsync(IDbTransaction tx, string? taskType = null) { var sql = $@"select * from QUEUE order by {nameof(QueueTask.DateCreated)} diff --git a/InServiceQue.Sample/Program.cs b/InServiceQue.Sample/Program.cs index a56ca6a..4a688e4 100644 --- a/InServiceQue.Sample/Program.cs +++ b/InServiceQue.Sample/Program.cs @@ -3,6 +3,7 @@ using InServiceQue.Core.Models; using InServiceQue.Core.Repositories; using InServiceQue.InMemory; using InServiceQue.Sample; +using InServiceQue.Services; using Microsoft.AspNetCore.Mvc; var builder = WebApplication.CreateBuilder(args); @@ -15,12 +16,12 @@ var app = builder.Build(); app.MapGet("/", async (string msg) => { - var taskRepository = app.Services.GetService(); - await taskRepository.InsertAsync(new QueueTask(new SendMessageTask(new SendMessagePayload() + var taskRepository = app.Services.GetService(); + await taskRepository.AddTaskAsync(new QueueTask(new SendMessageTask(new SendMessagePayload() { From = "John", To = "Esther", Message = msg }))); - await taskRepository.InsertAsync(new QueueTask(new OtherMessageTask(msg))); + await taskRepository.AddTaskAsync(new QueueTask(new OtherMessageTask(msg))); return new OkResult(); }); diff --git a/InServiceQue/InServiceQueBuilder.cs b/InServiceQue/InServiceQueBuilder.cs index 24134a0..9eb9a5b 100644 --- a/InServiceQue/InServiceQueBuilder.cs +++ b/InServiceQue/InServiceQueBuilder.cs @@ -20,6 +20,7 @@ public class InServiceQueBuilder { _typeRegistry = new QueueTypeRegistry(); _services.AddSingleton(_typeRegistry); + _services.AddTransient(); _hostedServiceRegistrator = new HostedServiceRegistrator(); } diff --git a/InServiceQue/Services/IQueueClient.cs b/InServiceQue/Services/IQueueClient.cs new file mode 100644 index 0000000..5085c1a --- /dev/null +++ b/InServiceQue/Services/IQueueClient.cs @@ -0,0 +1,9 @@ +using InServiceQue.Core.Models; + +namespace InServiceQue.Services; + +public interface IQueueClient +{ + void AddTask(IQueueTask task); + Task AddTaskAsync(IQueueTask task); +} \ No newline at end of file diff --git a/InServiceQue/Services/IQueueService.cs b/InServiceQue/Services/IQueueService.cs index 41f21d6..c387f57 100644 --- a/InServiceQue/Services/IQueueService.cs +++ b/InServiceQue/Services/IQueueService.cs @@ -5,11 +5,9 @@ namespace InServiceQue.Services; public interface IQueueService: IHostedService { - void AddTask(IQueueTask task); void TryProcessTask(); } public interface IQueueService : IQueueService where T : IQueueTask { - } \ No newline at end of file diff --git a/InServiceQue/Services/QueueClient.cs b/InServiceQue/Services/QueueClient.cs new file mode 100644 index 0000000..cc40e59 --- /dev/null +++ b/InServiceQue/Services/QueueClient.cs @@ -0,0 +1,21 @@ +using InServiceQue.Core.Models; +using InServiceQue.Core.Repositories; + +namespace InServiceQue.Services; + +public class QueueClient: IQueueClient +{ + private readonly ITaskRepository _repository; + + public void AddTask(IQueueTask task) + { + var queueTask = new QueueTask(task); + _repository.Insert(queueTask); + } + + public async Task AddTaskAsync(IQueueTask task) + { + var queueTask = new QueueTask(task); + await _repository.InsertAsync(queueTask); + } +} \ No newline at end of file diff --git a/InServiceQue/Services/QueueService.cs b/InServiceQue/Services/QueueService.cs index 5a1761a..f36343a 100644 --- a/InServiceQue/Services/QueueService.cs +++ b/InServiceQue/Services/QueueService.cs @@ -1,3 +1,4 @@ +using System.Diagnostics; using InServiceQue.Core.Models; using InServiceQue.Core.Repositories; using Microsoft.Extensions.DependencyInjection; @@ -19,18 +20,6 @@ where T: IQueueTask _serviceProvider = serviceProvider; } - public void AddTask(IQueueTask task) - { - var queueTask = new QueueTask(task); - _repository.Insert(queueTask); - } - - public async Task AddTaskAsync(IQueueTask task) - { - var queueTask = new QueueTask(task); - await _repository.InsertAsync(queueTask); - } - public void TryProcessTask() { using (var scope = _serviceProvider.CreateScope()) @@ -44,6 +33,8 @@ where T: IQueueTask } var taskType = queueTask.TaskType; + var ttype = typeof(T); + Debug.WriteLine($"TType: {ttype}, TaskType: {taskType}"); var handler = _typeRegistry.GetService(scope, taskType); try @@ -64,15 +55,18 @@ where T: IQueueTask using (var scope = _serviceProvider.CreateScope()) { using var tx = await _repository.StartTransactionAsync(); - var queueTask = await _repository.GetNextTaskAsync(tx); + + var taskType = typeof(T); + + var queueTask = await _repository.GetNextTaskAsync(tx, taskType.Name); if (queueTask == null) { await _repository.RollbackTransactionAsync(tx); return; } - var taskType = queueTask.TaskType; - var handler = _typeRegistry.GetService(scope, taskType); + var serviceHandler = scope.ServiceProvider.GetService(typeof(IQueueHandler<>).MakeGenericType(taskType)); + var handler = (IQueueHandler)serviceHandler; try { diff --git a/InServiceQue/Services/QueueTypeRegistry.cs b/InServiceQue/Services/QueueTypeRegistry.cs index 80a48ee..1fe3d9e 100644 --- a/InServiceQue/Services/QueueTypeRegistry.cs +++ b/InServiceQue/Services/QueueTypeRegistry.cs @@ -14,6 +14,7 @@ public class QueueTypeRegistry: ITypeRegistry public IQueueHandler GetService(IServiceScope scope, string taskType) { var classType = _typeRegistry[taskType]; + var service = scope.ServiceProvider.GetService(classType); return (IQueueHandler)service; }