Compare commits

..

3 Commits

10 changed files with 131 additions and 40 deletions

View File

@@ -13,8 +13,8 @@ public interface ITaskRepository: IDisposable
Task CommitTransactionAsync(IDbTransaction tx); Task CommitTransactionAsync(IDbTransaction tx);
void RollbackTransaction(IDbTransaction tx); void RollbackTransaction(IDbTransaction tx);
Task RollbackTransactionAsync(IDbTransaction tx); Task RollbackTransactionAsync(IDbTransaction tx);
QueueTask? GetNextTask(IDbTransaction tx); QueueTask? GetNextTask(IDbTransaction tx, string? taskType = null);
Task<QueueTask?> GetNextTaskAsync(IDbTransaction tx); Task<QueueTask?> GetNextTaskAsync(IDbTransaction tx, string? taskType = null);
void SaveTask(QueueTask task, IDbTransaction tx); void SaveTask(QueueTask task, IDbTransaction tx);
Task SaveTaskAsync(QueueTask task, IDbTransaction tx); Task SaveTaskAsync(QueueTask task, IDbTransaction tx);
} }

View File

@@ -1,4 +1,3 @@
using System.Collections.Concurrent;
using System.Data; using System.Data;
using InServiceQue.Core.Models; using InServiceQue.Core.Models;
using InServiceQue.Core.Repositories; using InServiceQue.Core.Repositories;
@@ -7,7 +6,9 @@ namespace InServiceQue.InMemory;
public class TaskRepositoryInMemory: ITaskRepository public class TaskRepositoryInMemory: ITaskRepository
{ {
private static ConcurrentQueue<QueueTask> _que = new(); private static object _lockObject = new();
private static int _currentShard;
private static Dictionary<string, Queue<QueueTask>> _quesByType = new();
public void Dispose() public void Dispose()
{ {
// TODO release managed resources here // TODO release managed resources here
@@ -15,7 +16,18 @@ public class TaskRepositoryInMemory: ITaskRepository
public void Insert(QueueTask task) public void Insert(QueueTask task)
{ {
_que.Enqueue(task); lock (_lockObject)
{
if (_quesByType.ContainsKey(task.TaskType))
{
_quesByType[task.TaskType].Enqueue(task);
}
else
{
_quesByType.Add(task.TaskType, new Queue<QueueTask>(new []{task}));
}
}
} }
public async Task InsertAsync(QueueTask task) public async Task InsertAsync(QueueTask task)
@@ -54,19 +66,65 @@ public class TaskRepositoryInMemory: ITaskRepository
await Task.CompletedTask; await Task.CompletedTask;
} }
public QueueTask? GetNextTask(IDbTransaction tx) public QueueTask? GetNextTask(IDbTransaction tx, string? taskType = null)
{ {
QueueTask? task; if (taskType == null)
_que.TryDequeue(out task); return GetNextTaskRolling();
QueueTask? task = null;
if (_quesByType.TryGetValue(taskType, out var que))
{
que.TryDequeue(out task);
}
return task; return task;
} }
public async Task<QueueTask?> GetNextTaskAsync(IDbTransaction tx) public QueueTask? GetNextTaskRolling()
{ {
QueueTask? task = null; QueueTask? task = null;
_que.TryDequeue(out task); lock (_lockObject)
{
var topicsCount = _quesByType.Count;
for (int i = 0; i < topicsCount; i++)
{
if (i == _currentShard)
{
var keyValuePair = _quesByType.ToArray()[i];
var que = keyValuePair.Value;
task = que.Dequeue();
if (!que.Any())
{
_quesByType.Remove(keyValuePair.Key);
}
break;
}
}
_currentShard = _currentShard == topicsCount - 1 ? 0 : _currentShard + 1;
}
return task;
}
public async Task<QueueTask?> GetNextTaskAsync(IDbTransaction tx, string? taskType = null)
{
if (taskType == null)
return await Task.FromResult(GetNextTaskRolling());
QueueTask? task = null;
lock (_lockObject)
{
if (_quesByType.TryGetValue(taskType, out var que))
{
que.TryDequeue(out task);
}
}
return await Task.FromResult(task); return await Task.FromResult(task);
} }

View File

@@ -61,7 +61,7 @@ public class TaskRepository: ITaskRepository
await ((NpgsqlTransaction)tx).RollbackAsync(); await ((NpgsqlTransaction)tx).RollbackAsync();
} }
public QueueTask? GetNextTask(IDbTransaction tx) public QueueTask? GetNextTask(IDbTransaction tx, string? taskType = null)
{ {
var sql = $@"select * from QUEUE where var sql = $@"select * from QUEUE where
order by {nameof(QueueTask.DateCreated)} order by {nameof(QueueTask.DateCreated)}
@@ -71,7 +71,7 @@ public class TaskRepository: ITaskRepository
return _connection.QuerySingleOrDefault<QueueTask>(sql, tx); return _connection.QuerySingleOrDefault<QueueTask>(sql, tx);
} }
public async Task<QueueTask?> GetNextTaskAsync(IDbTransaction tx) public async Task<QueueTask?> GetNextTaskAsync(IDbTransaction tx, string? taskType = null)
{ {
var sql = $@"select * from QUEUE var sql = $@"select * from QUEUE
order by {nameof(QueueTask.DateCreated)} order by {nameof(QueueTask.DateCreated)}

View File

@@ -3,6 +3,8 @@ using InServiceQue.Core.Models;
using InServiceQue.Core.Repositories; using InServiceQue.Core.Repositories;
using InServiceQue.InMemory; using InServiceQue.InMemory;
using InServiceQue.Sample; using InServiceQue.Sample;
using InServiceQue.Services;
using Microsoft.AspNetCore.Http.HttpResults;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
var builder = WebApplication.CreateBuilder(args); var builder = WebApplication.CreateBuilder(args);
@@ -15,12 +17,18 @@ var app = builder.Build();
app.MapGet("/", async (string msg) => app.MapGet("/", async (string msg) =>
{ {
var taskRepository = app.Services.GetService<ITaskRepository>(); if (!string.IsNullOrEmpty(msg))
await taskRepository.InsertAsync(new QueueTask(new SendMessageTask(new SendMessagePayload()
{ {
From = "John", To = "Esther", Message = msg var taskRepository = app.Services.GetService<IQueueClient>();
}))); await taskRepository.AddTaskAsync(new SendMessageTask(new SendMessagePayload()
await taskRepository.InsertAsync(new QueueTask(new OtherMessageTask(msg))); {
From = "John", To = "Esther", Message = msg
}));
await taskRepository.AddTaskAsync(new OtherMessageTask(msg));
return new OkResult();
}
return new OkResult(); return new OkResult();
}); });

View File

@@ -20,6 +20,7 @@ public class InServiceQueBuilder
{ {
_typeRegistry = new QueueTypeRegistry(); _typeRegistry = new QueueTypeRegistry();
_services.AddSingleton<ITypeRegistry>(_typeRegistry); _services.AddSingleton<ITypeRegistry>(_typeRegistry);
_services.AddTransient<IQueueClient, QueueClient>();
_hostedServiceRegistrator = new HostedServiceRegistrator(); _hostedServiceRegistrator = new HostedServiceRegistrator();
} }

View File

@@ -0,0 +1,9 @@
using InServiceQue.Core.Models;
namespace InServiceQue.Services;
public interface IQueueClient
{
void AddTask(IQueueTask task);
Task AddTaskAsync(IQueueTask task);
}

View File

@@ -5,11 +5,9 @@ namespace InServiceQue.Services;
public interface IQueueService: IHostedService public interface IQueueService: IHostedService
{ {
void AddTask(IQueueTask task);
void TryProcessTask(); void TryProcessTask();
} }
public interface IQueueService<T> : IQueueService where T : IQueueTask public interface IQueueService<T> : IQueueService where T : IQueueTask
{ {
} }

View File

@@ -0,0 +1,26 @@
using InServiceQue.Core.Models;
using InServiceQue.Core.Repositories;
namespace InServiceQue.Services;
public class QueueClient: IQueueClient
{
private readonly ITaskRepository _repository;
public QueueClient(ITaskRepository repository)
{
_repository = repository;
}
public void AddTask(IQueueTask task)
{
var queueTask = new QueueTask(task);
_repository.Insert(queueTask);
}
public async Task AddTaskAsync(IQueueTask task)
{
var queueTask = new QueueTask(task);
await _repository.InsertAsync(queueTask);
}
}

View File

@@ -1,3 +1,4 @@
using System.Diagnostics;
using InServiceQue.Core.Models; using InServiceQue.Core.Models;
using InServiceQue.Core.Repositories; using InServiceQue.Core.Repositories;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
@@ -8,43 +9,29 @@ namespace InServiceQue.Services;
public class QueueService<T> : BackgroundService, IQueueService<T> public class QueueService<T> : BackgroundService, IQueueService<T>
where T: IQueueTask where T: IQueueTask
{ {
private readonly ITypeRegistry _typeRegistry;
private readonly ITaskRepository _repository; private readonly ITaskRepository _repository;
private readonly IServiceProvider _serviceProvider; private readonly IServiceProvider _serviceProvider;
public QueueService(ITypeRegistry typeRegistry, ITaskRepository repository, IServiceProvider serviceProvider) public QueueService(ITaskRepository repository, IServiceProvider serviceProvider)
{ {
_typeRegistry = typeRegistry;
_repository = repository; _repository = repository;
_serviceProvider = serviceProvider; _serviceProvider = serviceProvider;
} }
public void AddTask(IQueueTask task)
{
var queueTask = new QueueTask(task);
_repository.Insert(queueTask);
}
public async Task AddTaskAsync(IQueueTask task)
{
var queueTask = new QueueTask(task);
await _repository.InsertAsync(queueTask);
}
public void TryProcessTask() public void TryProcessTask()
{ {
using (var scope = _serviceProvider.CreateScope()) using (var scope = _serviceProvider.CreateScope())
{ {
using var tx = _repository.StartTransaction(); using var tx = _repository.StartTransaction();
var queueTask = _repository.GetNextTask(tx); var queueTask = _repository.GetNextTask(tx, typeof(T).Name);
if (queueTask == null) if (queueTask == null)
{ {
_repository.RollbackTransaction(tx); _repository.RollbackTransaction(tx);
return; return;
} }
var taskType = queueTask.TaskType; var serviceHandler = scope.ServiceProvider.GetService(typeof(IQueueHandler<>).MakeGenericType(typeof(T)));
var handler = _typeRegistry.GetService(scope, taskType); var handler = (IQueueHandler)serviceHandler;
try try
{ {
@@ -64,15 +51,18 @@ where T: IQueueTask
using (var scope = _serviceProvider.CreateScope()) using (var scope = _serviceProvider.CreateScope())
{ {
using var tx = await _repository.StartTransactionAsync(); using var tx = await _repository.StartTransactionAsync();
var queueTask = await _repository.GetNextTaskAsync(tx);
var taskType = typeof(T);
var queueTask = await _repository.GetNextTaskAsync(tx, taskType.Name);
if (queueTask == null) if (queueTask == null)
{ {
await _repository.RollbackTransactionAsync(tx); await _repository.RollbackTransactionAsync(tx);
return; return;
} }
var taskType = queueTask.TaskType; var serviceHandler = scope.ServiceProvider.GetService(typeof(IQueueHandler<>).MakeGenericType(taskType));
var handler = _typeRegistry.GetService(scope, taskType); var handler = (IQueueHandler)serviceHandler;
try try
{ {

View File

@@ -14,6 +14,7 @@ public class QueueTypeRegistry: ITypeRegistry
public IQueueHandler GetService(IServiceScope scope, string taskType) public IQueueHandler GetService(IServiceScope scope, string taskType)
{ {
var classType = _typeRegistry[taskType]; var classType = _typeRegistry[taskType];
var service = scope.ServiceProvider.GetService(classType); var service = scope.ServiceProvider.GetService(classType);
return (IQueueHandler)service; return (IQueueHandler)service;
} }