feat: explicit queue client, resolve handler avoiding type registry
This commit is contained in:
@@ -13,8 +13,8 @@ public interface ITaskRepository: IDisposable
|
|||||||
Task CommitTransactionAsync(IDbTransaction tx);
|
Task CommitTransactionAsync(IDbTransaction tx);
|
||||||
void RollbackTransaction(IDbTransaction tx);
|
void RollbackTransaction(IDbTransaction tx);
|
||||||
Task RollbackTransactionAsync(IDbTransaction tx);
|
Task RollbackTransactionAsync(IDbTransaction tx);
|
||||||
QueueTask? GetNextTask(IDbTransaction tx);
|
QueueTask? GetNextTask(IDbTransaction tx, string? taskType = null);
|
||||||
Task<QueueTask?> GetNextTaskAsync(IDbTransaction tx);
|
Task<QueueTask?> GetNextTaskAsync(IDbTransaction tx, string? taskType = null);
|
||||||
void SaveTask(QueueTask task, IDbTransaction tx);
|
void SaveTask(QueueTask task, IDbTransaction tx);
|
||||||
Task SaveTaskAsync(QueueTask task, IDbTransaction tx);
|
Task SaveTaskAsync(QueueTask task, IDbTransaction tx);
|
||||||
}
|
}
|
||||||
@@ -7,7 +7,9 @@ namespace InServiceQue.InMemory;
|
|||||||
|
|
||||||
public class TaskRepositoryInMemory: ITaskRepository
|
public class TaskRepositoryInMemory: ITaskRepository
|
||||||
{
|
{
|
||||||
private static ConcurrentQueue<QueueTask> _que = new();
|
//todo: concurrent access here
|
||||||
|
private static int _currentShard;
|
||||||
|
private static ConcurrentDictionary<string, ConcurrentQueue<QueueTask>> _quesByType = new();
|
||||||
public void Dispose()
|
public void Dispose()
|
||||||
{
|
{
|
||||||
// TODO release managed resources here
|
// TODO release managed resources here
|
||||||
@@ -15,11 +17,22 @@ public class TaskRepositoryInMemory: ITaskRepository
|
|||||||
|
|
||||||
public void Insert(QueueTask task)
|
public void Insert(QueueTask task)
|
||||||
{
|
{
|
||||||
_que.Enqueue(task);
|
if (_quesByType.ContainsKey(task.TaskType))
|
||||||
|
{
|
||||||
|
_quesByType.AddOrUpdate(task.TaskType,
|
||||||
|
(t) => new ConcurrentQueue<QueueTask>(new[] { task }),
|
||||||
|
(t, queue) =>
|
||||||
|
{
|
||||||
|
queue.Enqueue(task);
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task InsertAsync(QueueTask task)
|
public async Task InsertAsync(QueueTask task)
|
||||||
{
|
{
|
||||||
|
|
||||||
Insert(task);
|
Insert(task);
|
||||||
await Task.CompletedTask;
|
await Task.CompletedTask;
|
||||||
}
|
}
|
||||||
@@ -54,19 +67,60 @@ public class TaskRepositoryInMemory: ITaskRepository
|
|||||||
await Task.CompletedTask;
|
await Task.CompletedTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
public QueueTask? GetNextTask(IDbTransaction tx)
|
public QueueTask? GetNextTask(IDbTransaction tx, string? taskType = null)
|
||||||
{
|
{
|
||||||
QueueTask? task;
|
if (taskType == null)
|
||||||
_que.TryDequeue(out task);
|
return GetNextTaskRolling();
|
||||||
|
|
||||||
|
QueueTask? task = null;
|
||||||
|
if (_quesByType.TryGetValue(taskType, out var que))
|
||||||
|
{
|
||||||
|
que.TryDequeue(out task);
|
||||||
|
}
|
||||||
|
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<QueueTask?> GetNextTaskAsync(IDbTransaction tx)
|
public QueueTask? GetNextTaskRolling()
|
||||||
{
|
{
|
||||||
QueueTask? task = null;
|
QueueTask? task = null;
|
||||||
_que.TryDequeue(out task);
|
var topicsCount = _quesByType.Count;
|
||||||
|
for (int i = 0; i < topicsCount; i++)
|
||||||
|
{
|
||||||
|
//todo: concurrent access
|
||||||
|
if (i == _currentShard)
|
||||||
|
{
|
||||||
|
var keyValuePair = _quesByType.ToArray()[i];
|
||||||
|
var que = keyValuePair.Value;
|
||||||
|
que.TryDequeue(out task);
|
||||||
|
|
||||||
|
if (que.IsEmpty)
|
||||||
|
{
|
||||||
|
_quesByType.TryRemove(keyValuePair);
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//todo: concurrent access
|
||||||
|
_currentShard = _currentShard == topicsCount - 1 ? 0 : _currentShard + 1;
|
||||||
|
|
||||||
|
return task;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
public async Task<QueueTask?> GetNextTaskAsync(IDbTransaction tx, string? taskType = null)
|
||||||
|
{
|
||||||
|
if (taskType == null)
|
||||||
|
return await Task.FromResult(GetNextTaskRolling());
|
||||||
|
|
||||||
|
QueueTask? task = null;
|
||||||
|
if (_quesByType.TryGetValue(taskType, out var que))
|
||||||
|
{
|
||||||
|
que.TryDequeue(out task);
|
||||||
|
}
|
||||||
|
|
||||||
return await Task.FromResult(task);
|
return await Task.FromResult(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -61,7 +61,7 @@ public class TaskRepository: ITaskRepository
|
|||||||
await ((NpgsqlTransaction)tx).RollbackAsync();
|
await ((NpgsqlTransaction)tx).RollbackAsync();
|
||||||
}
|
}
|
||||||
|
|
||||||
public QueueTask? GetNextTask(IDbTransaction tx)
|
public QueueTask? GetNextTask(IDbTransaction tx, string? taskType = null)
|
||||||
{
|
{
|
||||||
var sql = $@"select * from QUEUE where
|
var sql = $@"select * from QUEUE where
|
||||||
order by {nameof(QueueTask.DateCreated)}
|
order by {nameof(QueueTask.DateCreated)}
|
||||||
@@ -71,7 +71,7 @@ public class TaskRepository: ITaskRepository
|
|||||||
return _connection.QuerySingleOrDefault<QueueTask>(sql, tx);
|
return _connection.QuerySingleOrDefault<QueueTask>(sql, tx);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<QueueTask?> GetNextTaskAsync(IDbTransaction tx)
|
public async Task<QueueTask?> GetNextTaskAsync(IDbTransaction tx, string? taskType = null)
|
||||||
{
|
{
|
||||||
var sql = $@"select * from QUEUE
|
var sql = $@"select * from QUEUE
|
||||||
order by {nameof(QueueTask.DateCreated)}
|
order by {nameof(QueueTask.DateCreated)}
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ using InServiceQue.Core.Models;
|
|||||||
using InServiceQue.Core.Repositories;
|
using InServiceQue.Core.Repositories;
|
||||||
using InServiceQue.InMemory;
|
using InServiceQue.InMemory;
|
||||||
using InServiceQue.Sample;
|
using InServiceQue.Sample;
|
||||||
|
using InServiceQue.Services;
|
||||||
using Microsoft.AspNetCore.Mvc;
|
using Microsoft.AspNetCore.Mvc;
|
||||||
|
|
||||||
var builder = WebApplication.CreateBuilder(args);
|
var builder = WebApplication.CreateBuilder(args);
|
||||||
@@ -15,12 +16,12 @@ var app = builder.Build();
|
|||||||
|
|
||||||
app.MapGet("/", async (string msg) =>
|
app.MapGet("/", async (string msg) =>
|
||||||
{
|
{
|
||||||
var taskRepository = app.Services.GetService<ITaskRepository>();
|
var taskRepository = app.Services.GetService<IQueueClient>();
|
||||||
await taskRepository.InsertAsync(new QueueTask(new SendMessageTask(new SendMessagePayload()
|
await taskRepository.AddTaskAsync(new QueueTask(new SendMessageTask(new SendMessagePayload()
|
||||||
{
|
{
|
||||||
From = "John", To = "Esther", Message = msg
|
From = "John", To = "Esther", Message = msg
|
||||||
})));
|
})));
|
||||||
await taskRepository.InsertAsync(new QueueTask(new OtherMessageTask(msg)));
|
await taskRepository.AddTaskAsync(new QueueTask(new OtherMessageTask(msg)));
|
||||||
return new OkResult();
|
return new OkResult();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ public class InServiceQueBuilder
|
|||||||
{
|
{
|
||||||
_typeRegistry = new QueueTypeRegistry();
|
_typeRegistry = new QueueTypeRegistry();
|
||||||
_services.AddSingleton<ITypeRegistry>(_typeRegistry);
|
_services.AddSingleton<ITypeRegistry>(_typeRegistry);
|
||||||
|
_services.AddTransient<IQueueClient, QueueClient>();
|
||||||
_hostedServiceRegistrator = new HostedServiceRegistrator();
|
_hostedServiceRegistrator = new HostedServiceRegistrator();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
9
InServiceQue/Services/IQueueClient.cs
Normal file
9
InServiceQue/Services/IQueueClient.cs
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
using InServiceQue.Core.Models;
|
||||||
|
|
||||||
|
namespace InServiceQue.Services;
|
||||||
|
|
||||||
|
public interface IQueueClient
|
||||||
|
{
|
||||||
|
void AddTask(IQueueTask task);
|
||||||
|
Task AddTaskAsync(IQueueTask task);
|
||||||
|
}
|
||||||
@@ -5,11 +5,9 @@ namespace InServiceQue.Services;
|
|||||||
|
|
||||||
public interface IQueueService: IHostedService
|
public interface IQueueService: IHostedService
|
||||||
{
|
{
|
||||||
void AddTask(IQueueTask task);
|
|
||||||
void TryProcessTask();
|
void TryProcessTask();
|
||||||
}
|
}
|
||||||
|
|
||||||
public interface IQueueService<T> : IQueueService where T : IQueueTask
|
public interface IQueueService<T> : IQueueService where T : IQueueTask
|
||||||
{
|
{
|
||||||
|
|
||||||
}
|
}
|
||||||
21
InServiceQue/Services/QueueClient.cs
Normal file
21
InServiceQue/Services/QueueClient.cs
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
using InServiceQue.Core.Models;
|
||||||
|
using InServiceQue.Core.Repositories;
|
||||||
|
|
||||||
|
namespace InServiceQue.Services;
|
||||||
|
|
||||||
|
public class QueueClient: IQueueClient
|
||||||
|
{
|
||||||
|
private readonly ITaskRepository _repository;
|
||||||
|
|
||||||
|
public void AddTask(IQueueTask task)
|
||||||
|
{
|
||||||
|
var queueTask = new QueueTask(task);
|
||||||
|
_repository.Insert(queueTask);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task AddTaskAsync(IQueueTask task)
|
||||||
|
{
|
||||||
|
var queueTask = new QueueTask(task);
|
||||||
|
await _repository.InsertAsync(queueTask);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,3 +1,4 @@
|
|||||||
|
using System.Diagnostics;
|
||||||
using InServiceQue.Core.Models;
|
using InServiceQue.Core.Models;
|
||||||
using InServiceQue.Core.Repositories;
|
using InServiceQue.Core.Repositories;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
@@ -19,18 +20,6 @@ where T: IQueueTask
|
|||||||
_serviceProvider = serviceProvider;
|
_serviceProvider = serviceProvider;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void AddTask(IQueueTask task)
|
|
||||||
{
|
|
||||||
var queueTask = new QueueTask(task);
|
|
||||||
_repository.Insert(queueTask);
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task AddTaskAsync(IQueueTask task)
|
|
||||||
{
|
|
||||||
var queueTask = new QueueTask(task);
|
|
||||||
await _repository.InsertAsync(queueTask);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void TryProcessTask()
|
public void TryProcessTask()
|
||||||
{
|
{
|
||||||
using (var scope = _serviceProvider.CreateScope())
|
using (var scope = _serviceProvider.CreateScope())
|
||||||
@@ -44,6 +33,8 @@ where T: IQueueTask
|
|||||||
}
|
}
|
||||||
|
|
||||||
var taskType = queueTask.TaskType;
|
var taskType = queueTask.TaskType;
|
||||||
|
var ttype = typeof(T);
|
||||||
|
Debug.WriteLine($"TType: {ttype}, TaskType: {taskType}");
|
||||||
var handler = _typeRegistry.GetService(scope, taskType);
|
var handler = _typeRegistry.GetService(scope, taskType);
|
||||||
|
|
||||||
try
|
try
|
||||||
@@ -64,15 +55,18 @@ where T: IQueueTask
|
|||||||
using (var scope = _serviceProvider.CreateScope())
|
using (var scope = _serviceProvider.CreateScope())
|
||||||
{
|
{
|
||||||
using var tx = await _repository.StartTransactionAsync();
|
using var tx = await _repository.StartTransactionAsync();
|
||||||
var queueTask = await _repository.GetNextTaskAsync(tx);
|
|
||||||
|
var taskType = typeof(T);
|
||||||
|
|
||||||
|
var queueTask = await _repository.GetNextTaskAsync(tx, taskType.Name);
|
||||||
if (queueTask == null)
|
if (queueTask == null)
|
||||||
{
|
{
|
||||||
await _repository.RollbackTransactionAsync(tx);
|
await _repository.RollbackTransactionAsync(tx);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
var taskType = queueTask.TaskType;
|
var serviceHandler = scope.ServiceProvider.GetService(typeof(IQueueHandler<>).MakeGenericType(taskType));
|
||||||
var handler = _typeRegistry.GetService(scope, taskType);
|
var handler = (IQueueHandler)serviceHandler;
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ public class QueueTypeRegistry: ITypeRegistry
|
|||||||
public IQueueHandler GetService(IServiceScope scope, string taskType)
|
public IQueueHandler GetService(IServiceScope scope, string taskType)
|
||||||
{
|
{
|
||||||
var classType = _typeRegistry[taskType];
|
var classType = _typeRegistry[taskType];
|
||||||
|
|
||||||
var service = scope.ServiceProvider.GetService(classType);
|
var service = scope.ServiceProvider.GetService(classType);
|
||||||
return (IQueueHandler)service;
|
return (IQueueHandler)service;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user