From 5d80c6351c676e70feb260719ea98ac00db2ba73 Mon Sep 17 00:00:00 2001 From: HiveBeats Date: Sun, 10 Mar 2024 00:43:57 +0700 Subject: [PATCH] Initial Commit; --- .gitignore | 5 + .idea/.idea.InServiceQue/.idea/.gitignore | 13 ++ .idea/.idea.InServiceQue/.idea/encodings.xml | 4 + .../.idea.InServiceQue/.idea/indexLayout.xml | 8 ++ .idea/.idea.InServiceQue/.idea/vcs.xml | 6 + InServiceQue.Core/InServiceQue.Core.csproj | 13 ++ InServiceQue.Core/Models/IQueueTask.cs | 7 + InServiceQue.Core/Models/QueueTask.cs | 45 +++++++ .../Repositories/ITaskRepository.cs | 20 +++ .../InServiceQue.InMemory.csproj | 13 ++ .../TaskRepositoryInMemory.cs | 82 ++++++++++++ .../InServiceQue.Postgres.csproj | 18 +++ InServiceQue.Postgres/TaskRepository.cs | 122 ++++++++++++++++++ InServiceQue.Sample/DIExtensions.cs | 56 ++++++++ .../HostedServiceRegistrator.cs | 19 +++ .../InServiceQue.Sample.csproj | 16 +++ InServiceQue.Sample/Program.cs | 14 ++ .../Properties/launchSettings.json | 37 ++++++ InServiceQue.Sample/SendMessageHandler.cs | 17 +++ InServiceQue.Sample/SendMessageTask.cs | 33 +++++ .../appsettings.Development.json | 8 ++ InServiceQue.Sample/appsettings.json | 9 ++ InServiceQue.sln | 40 ++++++ InServiceQue/InServiceQue.csproj | 17 +++ InServiceQue/Services/IQueueHandler.cs | 14 ++ InServiceQue/Services/IQueueService.cs | 15 +++ InServiceQue/Services/ITypeRegistry.cs | 9 ++ InServiceQue/Services/QueueService.cs | 118 +++++++++++++++++ InServiceQue/Services/QueueTypeRegistry.cs | 19 +++ 29 files changed, 797 insertions(+) create mode 100644 .gitignore create mode 100644 .idea/.idea.InServiceQue/.idea/.gitignore create mode 100644 .idea/.idea.InServiceQue/.idea/encodings.xml create mode 100644 .idea/.idea.InServiceQue/.idea/indexLayout.xml create mode 100644 .idea/.idea.InServiceQue/.idea/vcs.xml create mode 100644 InServiceQue.Core/InServiceQue.Core.csproj create mode 100644 InServiceQue.Core/Models/IQueueTask.cs create mode 100644 InServiceQue.Core/Models/QueueTask.cs create mode 100644 InServiceQue.Core/Repositories/ITaskRepository.cs create mode 100644 InServiceQue.InMemory/InServiceQue.InMemory.csproj create mode 100644 InServiceQue.InMemory/TaskRepositoryInMemory.cs create mode 100644 InServiceQue.Postgres/InServiceQue.Postgres.csproj create mode 100644 InServiceQue.Postgres/TaskRepository.cs create mode 100644 InServiceQue.Sample/DIExtensions.cs create mode 100644 InServiceQue.Sample/HostedServiceRegistrator.cs create mode 100644 InServiceQue.Sample/InServiceQue.Sample.csproj create mode 100644 InServiceQue.Sample/Program.cs create mode 100644 InServiceQue.Sample/Properties/launchSettings.json create mode 100644 InServiceQue.Sample/SendMessageHandler.cs create mode 100644 InServiceQue.Sample/SendMessageTask.cs create mode 100644 InServiceQue.Sample/appsettings.Development.json create mode 100644 InServiceQue.Sample/appsettings.json create mode 100644 InServiceQue.sln create mode 100644 InServiceQue/InServiceQue.csproj create mode 100644 InServiceQue/Services/IQueueHandler.cs create mode 100644 InServiceQue/Services/IQueueService.cs create mode 100644 InServiceQue/Services/ITypeRegistry.cs create mode 100644 InServiceQue/Services/QueueService.cs create mode 100644 InServiceQue/Services/QueueTypeRegistry.cs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..add57be --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +bin/ +obj/ +/packages/ +riderModule.iml +/_ReSharper.Caches/ \ No newline at end of file diff --git a/.idea/.idea.InServiceQue/.idea/.gitignore b/.idea/.idea.InServiceQue/.idea/.gitignore new file mode 100644 index 0000000..0beccdb --- /dev/null +++ b/.idea/.idea.InServiceQue/.idea/.gitignore @@ -0,0 +1,13 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Rider ignored files +/modules.xml +/contentModel.xml +/projectSettingsUpdater.xml +/.idea.InServiceQue.iml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/.idea.InServiceQue/.idea/encodings.xml b/.idea/.idea.InServiceQue/.idea/encodings.xml new file mode 100644 index 0000000..df87cf9 --- /dev/null +++ b/.idea/.idea.InServiceQue/.idea/encodings.xml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/.idea/.idea.InServiceQue/.idea/indexLayout.xml b/.idea/.idea.InServiceQue/.idea/indexLayout.xml new file mode 100644 index 0000000..7b08163 --- /dev/null +++ b/.idea/.idea.InServiceQue/.idea/indexLayout.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/.idea.InServiceQue/.idea/vcs.xml b/.idea/.idea.InServiceQue/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/.idea.InServiceQue/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/InServiceQue.Core/InServiceQue.Core.csproj b/InServiceQue.Core/InServiceQue.Core.csproj new file mode 100644 index 0000000..60f11f0 --- /dev/null +++ b/InServiceQue.Core/InServiceQue.Core.csproj @@ -0,0 +1,13 @@ + + + + net7.0 + enable + enable + + + + + + + diff --git a/InServiceQue.Core/Models/IQueueTask.cs b/InServiceQue.Core/Models/IQueueTask.cs new file mode 100644 index 0000000..5835635 --- /dev/null +++ b/InServiceQue.Core/Models/IQueueTask.cs @@ -0,0 +1,7 @@ +namespace InServiceQue.Core.Models; + +public interface IQueueTask +{ + string GetTypeString(); + string GetPayloadString(); +} \ No newline at end of file diff --git a/InServiceQue.Core/Models/QueueTask.cs b/InServiceQue.Core/Models/QueueTask.cs new file mode 100644 index 0000000..242a8a1 --- /dev/null +++ b/InServiceQue.Core/Models/QueueTask.cs @@ -0,0 +1,45 @@ +namespace InServiceQue.Core.Models; + +public class QueueTask: IQueueTask +{ + private string _type; + public Guid Id { get; init; } + public string TaskType { get; init; } = default!; + public DateTime DateCreated { get; init; } + public DateTime? DateProcessed { get; private set; } + public DateTime? DateClosed { get; private set; } + public int Attempts { get; private set; } + public string? Payload { get; init; } + + + public QueueTask(string taskType, string? payload) + { + Id = Guid.NewGuid(); + TaskType = taskType; + DateCreated = DateTime.UtcNow; + Payload = payload; + } + + public QueueTask(IQueueTask task): this(task.GetTypeString(), task.GetPayloadString()) { } + + public void MarkAttempt() + { + DateProcessed = DateTime.UtcNow; + Attempts++; + } + + public void SolveTask() + { + DateClosed = DateTime.UtcNow; + } + + string IQueueTask.GetTypeString() + { + return TaskType; + } + + string IQueueTask.GetPayloadString() + { + return Payload ?? string.Empty; + } +} \ No newline at end of file diff --git a/InServiceQue.Core/Repositories/ITaskRepository.cs b/InServiceQue.Core/Repositories/ITaskRepository.cs new file mode 100644 index 0000000..dc066be --- /dev/null +++ b/InServiceQue.Core/Repositories/ITaskRepository.cs @@ -0,0 +1,20 @@ +using System.Data; +using InServiceQue.Core.Models; + +namespace InServiceQue.Core.Repositories; + +public interface ITaskRepository: IDisposable +{ + void Insert(QueueTask task); + Task InsertAsync(QueueTask task); + IDbTransaction StartTransaction(); + Task StartTransactionAsync(); + void CommitTransaction(IDbTransaction tx); + Task CommitTransactionAsync(IDbTransaction tx); + void RollbackTransaction(IDbTransaction tx); + Task RollbackTransactionAsync(IDbTransaction tx); + QueueTask? GetNextTask(IDbTransaction tx); + Task GetNextTaskAsync(IDbTransaction tx); + void SaveTask(QueueTask task, IDbTransaction tx); + Task SaveTaskAsync(QueueTask task, IDbTransaction tx); +} \ No newline at end of file diff --git a/InServiceQue.InMemory/InServiceQue.InMemory.csproj b/InServiceQue.InMemory/InServiceQue.InMemory.csproj new file mode 100644 index 0000000..6a390b9 --- /dev/null +++ b/InServiceQue.InMemory/InServiceQue.InMemory.csproj @@ -0,0 +1,13 @@ + + + + net7.0 + enable + enable + + + + + + + diff --git a/InServiceQue.InMemory/TaskRepositoryInMemory.cs b/InServiceQue.InMemory/TaskRepositoryInMemory.cs new file mode 100644 index 0000000..5f97efd --- /dev/null +++ b/InServiceQue.InMemory/TaskRepositoryInMemory.cs @@ -0,0 +1,82 @@ +using System.Collections.Concurrent; +using System.Data; +using InServiceQue.Core.Models; +using InServiceQue.Core.Repositories; + +namespace InServiceQue.InMemory; + +public class TaskRepositoryInMemory: ITaskRepository +{ + private static ConcurrentQueue _que = new(); + public void Dispose() + { + // TODO release managed resources here + } + + public void Insert(QueueTask task) + { + _que.Enqueue(task); + } + + public async Task InsertAsync(QueueTask task) + { + Insert(task); + await Task.CompletedTask; + } + + public IDbTransaction StartTransaction() + { + return null; + } + + public async Task StartTransactionAsync() + { + return await Task.FromResult(null); + } + + public void CommitTransaction(IDbTransaction tx) + { + + } + + public async Task CommitTransactionAsync(IDbTransaction tx) + { + await Task.CompletedTask; + } + + public void RollbackTransaction(IDbTransaction tx) + { + + } + + public async Task RollbackTransactionAsync(IDbTransaction tx) + { + await Task.CompletedTask; + } + + public QueueTask? GetNextTask(IDbTransaction tx) + { + QueueTask? task; + _que.TryDequeue(out task); + + return task; + } + + public async Task GetNextTaskAsync(IDbTransaction tx) + { + QueueTask? task = null; + _que.TryDequeue(out task); + + return await Task.FromResult(task); + } + + public void SaveTask(QueueTask task, IDbTransaction tx) + { + + } + + public async Task SaveTaskAsync(QueueTask task, IDbTransaction tx) + { + await Task.CompletedTask; + } +} \ No newline at end of file diff --git a/InServiceQue.Postgres/InServiceQue.Postgres.csproj b/InServiceQue.Postgres/InServiceQue.Postgres.csproj new file mode 100644 index 0000000..5ceaf55 --- /dev/null +++ b/InServiceQue.Postgres/InServiceQue.Postgres.csproj @@ -0,0 +1,18 @@ + + + + net7.0 + enable + enable + + + + + + + + + + + + diff --git a/InServiceQue.Postgres/TaskRepository.cs b/InServiceQue.Postgres/TaskRepository.cs new file mode 100644 index 0000000..fe6d682 --- /dev/null +++ b/InServiceQue.Postgres/TaskRepository.cs @@ -0,0 +1,122 @@ +using System.Data; +using Dapper; +using InServiceQue.Core.Models; +using InServiceQue.Core.Repositories; +using Npgsql; + +namespace InServiceQue.Postgres; + +public class TaskRepository: ITaskRepository +{ + private readonly NpgsqlConnection _connection; + + public TaskRepository(string connectionString) + { + _connection = new NpgsqlConnection(connectionString); + } + + public void Insert(QueueTask task) + { + var sql = + $"INSERT INTO QUEUE({nameof(QueueTask.Id)}, {nameof(QueueTask.TaskType)}, {nameof(QueueTask.DateCreated)}, {nameof(QueueTask.Payload)}) VALUES (@{nameof(QueueTask.Id)}, @{nameof(QueueTask.TaskType)}, @{nameof(QueueTask.DateCreated)}, @{nameof(QueueTask.Payload)})"; + + _connection.Execute(sql, task); + } + + public async Task InsertAsync(QueueTask task) + { + var sql = + $"INSERT INTO QUEUE({nameof(QueueTask.Id)}, {nameof(QueueTask.TaskType)}, {nameof(QueueTask.DateCreated)}, {nameof(QueueTask.Payload)}) VALUES (@{nameof(QueueTask.Id)}, @{nameof(QueueTask.TaskType)}, @{nameof(QueueTask.DateCreated)}, @{nameof(QueueTask.Payload)})"; + + await _connection.ExecuteAsync(sql, task); + } + + public IDbTransaction StartTransaction() + { + return _connection.BeginTransaction(); + } + + public async Task StartTransactionAsync() + { + return await _connection.BeginTransactionAsync(); + } + + public void CommitTransaction(IDbTransaction tx) + { + tx.Commit(); + } + + public async Task CommitTransactionAsync(IDbTransaction tx) + { + await ((NpgsqlTransaction)tx).CommitAsync(); + } + + public void RollbackTransaction(IDbTransaction tx) + { + tx.Rollback(); + } + + public async Task RollbackTransactionAsync(IDbTransaction tx) + { + await ((NpgsqlTransaction)tx).RollbackAsync(); + } + + public QueueTask? GetNextTask(IDbTransaction tx) + { + var sql = $@"select * from QUEUE where + order by {nameof(QueueTask.DateCreated)} + limit 1 + for update skip locked"; + + return _connection.QuerySingleOrDefault(sql, tx); + } + + public async Task GetNextTaskAsync(IDbTransaction tx) + { + var sql = $@"select * from QUEUE + order by {nameof(QueueTask.DateCreated)} + limit 1 + for update skip locked"; + + return await _connection.QuerySingleOrDefaultAsync(sql, tx); + } + + public void SaveTask(QueueTask task, IDbTransaction tx) + { + var sql = $@"UPDATE QUEUE + SET + {nameof(QueueTask.DateProcessed)} = @{nameof(QueueTask.DateProcessed)}, + {nameof(QueueTask.DateClosed)} = @{nameof(QueueTask.DateClosed)}, + {nameof(QueueTask.Attempts)} = @{nameof(QueueTask.Attempts)}, + {nameof(QueueTask.Payload)} = @{nameof(QueueTask.Payload)}"; + + _connection.Execute(sql, task, tx); + } + + public async Task SaveTaskAsync(QueueTask task, IDbTransaction tx) + { + var sql = $@"UPDATE QUEUE + SET {nameof(QueueTask.DateProcessed)} = @{nameof(QueueTask.DateProcessed)}, + {nameof(QueueTask.DateClosed)} = @{nameof(QueueTask.DateClosed)}, + {nameof(QueueTask.Attempts)} = @{nameof(QueueTask.Attempts)}, + {nameof(QueueTask.Payload)} = @{nameof(QueueTask.Payload)}"; + + await _connection.ExecuteAsync(sql, task, tx); + } + + private void ReleaseUnmanagedResources() + { + _connection.Dispose(); + } + + public void Dispose() + { + ReleaseUnmanagedResources(); + GC.SuppressFinalize(this); + } + + ~TaskRepository() + { + ReleaseUnmanagedResources(); + } +} \ No newline at end of file diff --git a/InServiceQue.Sample/DIExtensions.cs b/InServiceQue.Sample/DIExtensions.cs new file mode 100644 index 0000000..b51bcbb --- /dev/null +++ b/InServiceQue.Sample/DIExtensions.cs @@ -0,0 +1,56 @@ +using System.Reflection; +using InServiceQue.Core.Models; +using InServiceQue.Core.Repositories; +using InServiceQue.InMemory; +using InServiceQue.Services; +using Microsoft.Extensions.DependencyInjection.Extensions; + +namespace InServiceQue.Sample; + +public static class DIExtensions +{ + public static IServiceCollection RegisterInternals(this IServiceCollection services) + { + services.AddSingleton(); + services.AddTransient(); + return services; + } + + public static IServiceCollection RegisterQueues(this IServiceCollection services) + { + using var sp = services.BuildServiceProvider(); + // find all types in the assembly that implement IQueueHandler + var queueTypes = Assembly.GetExecutingAssembly().GetTypes() + .Where(t => t.IsClass && !t.IsAbstract && t.GetInterfaces() + .Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IQueueHandler<>))); + var hostedServiceRegistrator = new HostedServiceRegistrator(); + + // register each query type with its corresponding interface + foreach (var queueType in queueTypes) + { + // get the T from IQueueHandler + var type = queueType.GetInterfaces() + .Single(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IQueueHandler<>)); + + // register the query type as a scoped service with its corresponding interface + services.AddScoped(queueType); + + var typeRegistry = sp.GetRequiredService(); + typeRegistry.RegisterTaskType(type.GenericTypeArguments.First().Name, type); + + services.AddScoped(typeof(IQueueHandler<>) + .MakeGenericType(type.GenericTypeArguments), queueType); + + //todo: bug here + var hostedServiceType = typeof(QueueService<>).MakeGenericType(type.GenericTypeArguments); + hostedServiceRegistrator.RegisterHostedService(services, hostedServiceType); + + // services.AddSingleton(typeof(IQueueService<>) + // .MakeGenericType(type.GenericTypeArguments), ); + } + return services; + + } + + +} \ No newline at end of file diff --git a/InServiceQue.Sample/HostedServiceRegistrator.cs b/InServiceQue.Sample/HostedServiceRegistrator.cs new file mode 100644 index 0000000..be8ee7a --- /dev/null +++ b/InServiceQue.Sample/HostedServiceRegistrator.cs @@ -0,0 +1,19 @@ +using System.Reflection; + +namespace InServiceQue.Sample; + +public class HostedServiceRegistrator +{ + public void RegisterHostedService(IServiceCollection services, Type hostedServiceType) + { + Type servicesType = typeof(HostedServiceRegistrator); + MethodInfo methodInfo = servicesType.GetMethod("AddHostedService"); + MethodInfo genericMethod = methodInfo.MakeGenericMethod(hostedServiceType); + genericMethod.Invoke(this, new object[] { services }); + } + + // Needed as a work-arround because we can't call the extension method with reflection. + public IServiceCollection AddHostedService(IServiceCollection services) + where THostedService : class, IHostedService => + services.AddHostedService(); +} \ No newline at end of file diff --git a/InServiceQue.Sample/InServiceQue.Sample.csproj b/InServiceQue.Sample/InServiceQue.Sample.csproj new file mode 100644 index 0000000..ac20e04 --- /dev/null +++ b/InServiceQue.Sample/InServiceQue.Sample.csproj @@ -0,0 +1,16 @@ + + + + net7.0 + enable + enable + + + + + + + + + + diff --git a/InServiceQue.Sample/Program.cs b/InServiceQue.Sample/Program.cs new file mode 100644 index 0000000..607fcb4 --- /dev/null +++ b/InServiceQue.Sample/Program.cs @@ -0,0 +1,14 @@ +using InServiceQue.Core.Models; +using InServiceQue.Core.Repositories; +using InServiceQue.Sample; + +var builder = WebApplication.CreateBuilder(args); +builder.Services.RegisterInternals(); +builder.Services.RegisterQueues(); +var app = builder.Build(); + + + +app.MapGet("/", (string msg) => app.Services.GetService().Insert(new QueueTask(new SendMessageTask(new SendMessagePayload(){To = "John", From = "Garry", Message = msg})))); + +app.Run(); \ No newline at end of file diff --git a/InServiceQue.Sample/Properties/launchSettings.json b/InServiceQue.Sample/Properties/launchSettings.json new file mode 100644 index 0000000..474b9b0 --- /dev/null +++ b/InServiceQue.Sample/Properties/launchSettings.json @@ -0,0 +1,37 @@ +{ + "iisSettings": { + "windowsAuthentication": false, + "anonymousAuthentication": true, + "iisExpress": { + "applicationUrl": "http://localhost:61951", + "sslPort": 44361 + } + }, + "profiles": { + "http": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": true, + "applicationUrl": "http://localhost:5121", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + }, + "https": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": true, + "applicationUrl": "https://localhost:7149;http://localhost:5121", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + }, + "IIS Express": { + "commandName": "IISExpress", + "launchBrowser": true, + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + } + } +} diff --git a/InServiceQue.Sample/SendMessageHandler.cs b/InServiceQue.Sample/SendMessageHandler.cs new file mode 100644 index 0000000..9c3ae63 --- /dev/null +++ b/InServiceQue.Sample/SendMessageHandler.cs @@ -0,0 +1,17 @@ +using InServiceQue.Services; + +namespace InServiceQue.Sample; + +public class SendMessageHandler: IQueueHandler +{ + public bool Handle(string payload) + { + throw new NotImplementedException(); + } + + public Task HandleAsync(string payload) + { + Console.WriteLine(payload); + return Task.FromResult(true); + } +} \ No newline at end of file diff --git a/InServiceQue.Sample/SendMessageTask.cs b/InServiceQue.Sample/SendMessageTask.cs new file mode 100644 index 0000000..1a9ff5d --- /dev/null +++ b/InServiceQue.Sample/SendMessageTask.cs @@ -0,0 +1,33 @@ +using System.Text.Json; +using System.Text.Json.Serialization; +using InServiceQue.Core.Models; + +namespace InServiceQue.Sample; + + +public class SendMessagePayload +{ + public string From { get; set; } + public string To { get; set; } + public string Message { get; set; } +} + +public class SendMessageTask: IQueueTask +{ + private SendMessagePayload _payload; + + public SendMessageTask(SendMessagePayload payload) + { + _payload = payload; + } + + public string GetTypeString() + { + return nameof(SendMessageTask); + } + + public string GetPayloadString() + { + return JsonSerializer.Serialize(_payload); + } +} \ No newline at end of file diff --git a/InServiceQue.Sample/appsettings.Development.json b/InServiceQue.Sample/appsettings.Development.json new file mode 100644 index 0000000..0c208ae --- /dev/null +++ b/InServiceQue.Sample/appsettings.Development.json @@ -0,0 +1,8 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + } +} diff --git a/InServiceQue.Sample/appsettings.json b/InServiceQue.Sample/appsettings.json new file mode 100644 index 0000000..10f68b8 --- /dev/null +++ b/InServiceQue.Sample/appsettings.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + }, + "AllowedHosts": "*" +} diff --git a/InServiceQue.sln b/InServiceQue.sln new file mode 100644 index 0000000..3a36b52 --- /dev/null +++ b/InServiceQue.sln @@ -0,0 +1,40 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "InServiceQue.Core", "InServiceQue.Core\InServiceQue.Core.csproj", "{28B85073-3A07-41E0-9B94-9738280C63A6}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "InServiceQue.Postgres", "InServiceQue.Postgres\InServiceQue.Postgres.csproj", "{CF132CFC-3FEA-470E-B404-5FB9C4F3100C}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "InServiceQue", "InServiceQue\InServiceQue.csproj", "{42B7AE80-3C8C-478A-BC7F-0E86CCBC8AD9}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "InServiceQue.Sample", "InServiceQue.Sample\InServiceQue.Sample.csproj", "{8B68480F-D518-4DF8-BCF1-E77596B47B09}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "InServiceQue.InMemory", "InServiceQue.InMemory\InServiceQue.InMemory.csproj", "{97879D5B-FE10-4153-81DD-6B922929C993}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {28B85073-3A07-41E0-9B94-9738280C63A6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {28B85073-3A07-41E0-9B94-9738280C63A6}.Debug|Any CPU.Build.0 = Debug|Any CPU + {28B85073-3A07-41E0-9B94-9738280C63A6}.Release|Any CPU.ActiveCfg = Release|Any CPU + {28B85073-3A07-41E0-9B94-9738280C63A6}.Release|Any CPU.Build.0 = Release|Any CPU + {CF132CFC-3FEA-470E-B404-5FB9C4F3100C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {CF132CFC-3FEA-470E-B404-5FB9C4F3100C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {CF132CFC-3FEA-470E-B404-5FB9C4F3100C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {CF132CFC-3FEA-470E-B404-5FB9C4F3100C}.Release|Any CPU.Build.0 = Release|Any CPU + {42B7AE80-3C8C-478A-BC7F-0E86CCBC8AD9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {42B7AE80-3C8C-478A-BC7F-0E86CCBC8AD9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {42B7AE80-3C8C-478A-BC7F-0E86CCBC8AD9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {42B7AE80-3C8C-478A-BC7F-0E86CCBC8AD9}.Release|Any CPU.Build.0 = Release|Any CPU + {8B68480F-D518-4DF8-BCF1-E77596B47B09}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {8B68480F-D518-4DF8-BCF1-E77596B47B09}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8B68480F-D518-4DF8-BCF1-E77596B47B09}.Release|Any CPU.ActiveCfg = Release|Any CPU + {8B68480F-D518-4DF8-BCF1-E77596B47B09}.Release|Any CPU.Build.0 = Release|Any CPU + {97879D5B-FE10-4153-81DD-6B922929C993}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {97879D5B-FE10-4153-81DD-6B922929C993}.Debug|Any CPU.Build.0 = Debug|Any CPU + {97879D5B-FE10-4153-81DD-6B922929C993}.Release|Any CPU.ActiveCfg = Release|Any CPU + {97879D5B-FE10-4153-81DD-6B922929C993}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection +EndGlobal diff --git a/InServiceQue/InServiceQue.csproj b/InServiceQue/InServiceQue.csproj new file mode 100644 index 0000000..a58f78b --- /dev/null +++ b/InServiceQue/InServiceQue.csproj @@ -0,0 +1,17 @@ + + + + net7.0 + enable + enable + + + + + + + + + + + diff --git a/InServiceQue/Services/IQueueHandler.cs b/InServiceQue/Services/IQueueHandler.cs new file mode 100644 index 0000000..e24869b --- /dev/null +++ b/InServiceQue/Services/IQueueHandler.cs @@ -0,0 +1,14 @@ +using InServiceQue.Core.Models; + +namespace InServiceQue.Services; + +public interface IQueueHandler +{ + bool Handle(string payload); + Task HandleAsync(string payload); +} + +public interface IQueueHandler: IQueueHandler + where T: IQueueTask +{ +} \ No newline at end of file diff --git a/InServiceQue/Services/IQueueService.cs b/InServiceQue/Services/IQueueService.cs new file mode 100644 index 0000000..41f21d6 --- /dev/null +++ b/InServiceQue/Services/IQueueService.cs @@ -0,0 +1,15 @@ +using InServiceQue.Core.Models; +using Microsoft.Extensions.Hosting; + +namespace InServiceQue.Services; + +public interface IQueueService: IHostedService +{ + void AddTask(IQueueTask task); + void TryProcessTask(); +} + +public interface IQueueService : IQueueService where T : IQueueTask +{ + +} \ No newline at end of file diff --git a/InServiceQue/Services/ITypeRegistry.cs b/InServiceQue/Services/ITypeRegistry.cs new file mode 100644 index 0000000..e2d4458 --- /dev/null +++ b/InServiceQue/Services/ITypeRegistry.cs @@ -0,0 +1,9 @@ +using Microsoft.Extensions.DependencyInjection; + +namespace InServiceQue.Services; + +public interface ITypeRegistry +{ + void RegisterTaskType(string taskType, Type type); + IQueueHandler GetService(IServiceScope scope, string taskType); +} \ No newline at end of file diff --git a/InServiceQue/Services/QueueService.cs b/InServiceQue/Services/QueueService.cs new file mode 100644 index 0000000..c866879 --- /dev/null +++ b/InServiceQue/Services/QueueService.cs @@ -0,0 +1,118 @@ +using InServiceQue.Core.Models; +using InServiceQue.Core.Repositories; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +namespace InServiceQue.Services; + +public class QueueService : BackgroundService, IQueueService +where T: IQueueTask +{ + private readonly ITypeRegistry _typeRegistry; + private readonly ITaskRepository _repository; + private readonly IServiceProvider _serviceProvider; + + public QueueService(ITypeRegistry typeRegistry, ITaskRepository repository, IServiceProvider serviceProvider) + { + _typeRegistry = typeRegistry; + _repository = repository; + _serviceProvider = serviceProvider; + } + + public void AddTask(IQueueTask task) + { + var queueTask = new QueueTask(task); + _repository.Insert(queueTask); + } + + public async Task AddTaskAsync(IQueueTask task) + { + var queueTask = new QueueTask(task); + await _repository.InsertAsync(queueTask); + } + + public void TryProcessTask() + { + using (var scope = _serviceProvider.CreateScope()) + { + using var tx = _repository.StartTransaction(); + var queueTask = _repository.GetNextTask(tx); + if (queueTask == null) + { + _repository.RollbackTransaction(tx); + return; + } + + var taskType = queueTask.TaskType; + var handler = _typeRegistry.GetService(scope, taskType); + + try + { + Handle(queueTask, handler); + _repository.SaveTask(queueTask, tx); + _repository.CommitTransaction(tx); + } + catch (Exception e) + { + // ignored + } + } + } + + public async Task TryProcessTaskAsync() + { + using (var scope = _serviceProvider.CreateScope()) + { + using var tx = await _repository.StartTransactionAsync(); + var queueTask = await _repository.GetNextTaskAsync(tx); + if (queueTask == null) + { + await _repository.RollbackTransactionAsync(tx); + return; + } + + var taskType = queueTask.TaskType; + var handler = _typeRegistry.GetService(scope, taskType); + + try + { + await HandleAsync(queueTask, handler); + await _repository.SaveTaskAsync(queueTask, tx); + await _repository.CommitTransactionAsync(tx); + } + catch (Exception e) + { + // ignored + } + } + } + + internal void Handle(QueueTask queueTask, IQueueHandler handler) + { + queueTask.MarkAttempt(); + var success = handler.Handle(queueTask.Payload ?? string.Empty); + if (success) + { + queueTask.SolveTask(); + } + } + + internal async Task HandleAsync(QueueTask queueTask, IQueueHandler handler) + { + queueTask.MarkAttempt(); + var success = await handler.HandleAsync(queueTask.Payload ?? string.Empty); + if (success) + { + queueTask.SolveTask(); + } + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + while (true) + { + await TryProcessTaskAsync(); + await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken); + } + } +} \ No newline at end of file diff --git a/InServiceQue/Services/QueueTypeRegistry.cs b/InServiceQue/Services/QueueTypeRegistry.cs new file mode 100644 index 0000000..9e4b5d7 --- /dev/null +++ b/InServiceQue/Services/QueueTypeRegistry.cs @@ -0,0 +1,19 @@ +using Microsoft.Extensions.DependencyInjection; + +namespace InServiceQue.Services; + +public class QueueTypeRegistry: ITypeRegistry +{ + private static Dictionary _typeRegistry = new Dictionary(); + + public void RegisterTaskType(string taskType, Type type) + { + _typeRegistry.Add(taskType, type); + } + + public IQueueHandler GetService(IServiceScope scope, string taskType) + { + var classType = _typeRegistry[taskType]; + return (IQueueHandler)scope.ServiceProvider.GetService(classType); + } +} \ No newline at end of file