140 lines
3.3 KiB
C#
140 lines
3.3 KiB
C#
using System.Data;
|
|
using InServiceQue.Core.Models;
|
|
using InServiceQue.Core.Repositories;
|
|
|
|
namespace InServiceQue.InMemory;
|
|
|
|
public class TaskRepositoryInMemory: ITaskRepository
|
|
{
|
|
private static object _lockObject = new();
|
|
private static int _currentShard;
|
|
private static Dictionary<string, Queue<QueueTask>> _quesByType = new();
|
|
public void Dispose()
|
|
{
|
|
// TODO release managed resources here
|
|
}
|
|
|
|
public void Insert(QueueTask task)
|
|
{
|
|
lock (_lockObject)
|
|
{
|
|
if (_quesByType.ContainsKey(task.TaskType))
|
|
{
|
|
_quesByType[task.TaskType].Enqueue(task);
|
|
}
|
|
else
|
|
{
|
|
_quesByType.Add(task.TaskType, new Queue<QueueTask>(new []{task}));
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
public async Task InsertAsync(QueueTask task)
|
|
{
|
|
Insert(task);
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
public IDbTransaction StartTransaction()
|
|
{
|
|
return null;
|
|
}
|
|
|
|
public async Task<IDbTransaction> StartTransactionAsync()
|
|
{
|
|
return await Task.FromResult<IDbTransaction>(null);
|
|
}
|
|
|
|
public void CommitTransaction(IDbTransaction tx)
|
|
{
|
|
|
|
}
|
|
|
|
public async Task CommitTransactionAsync(IDbTransaction tx)
|
|
{
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
public void RollbackTransaction(IDbTransaction tx)
|
|
{
|
|
|
|
}
|
|
|
|
public async Task RollbackTransactionAsync(IDbTransaction tx)
|
|
{
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
public QueueTask? GetNextTask(IDbTransaction tx, string? taskType = null)
|
|
{
|
|
if (taskType == null)
|
|
return GetNextTaskRolling();
|
|
|
|
QueueTask? task = null;
|
|
if (_quesByType.TryGetValue(taskType, out var que))
|
|
{
|
|
que.TryDequeue(out task);
|
|
}
|
|
|
|
return task;
|
|
}
|
|
|
|
public QueueTask? GetNextTaskRolling()
|
|
{
|
|
QueueTask? task = null;
|
|
lock (_lockObject)
|
|
{
|
|
var topicsCount = _quesByType.Count;
|
|
for (int i = 0; i < topicsCount; i++)
|
|
{
|
|
if (i == _currentShard)
|
|
{
|
|
var keyValuePair = _quesByType.ToArray()[i];
|
|
var que = keyValuePair.Value;
|
|
task = que.Dequeue();
|
|
|
|
if (!que.Any())
|
|
{
|
|
_quesByType.Remove(keyValuePair.Key);
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
_currentShard = _currentShard == topicsCount - 1 ? 0 : _currentShard + 1;
|
|
}
|
|
|
|
return task;
|
|
}
|
|
|
|
|
|
|
|
public async Task<QueueTask?> GetNextTaskAsync(IDbTransaction tx, string? taskType = null)
|
|
{
|
|
if (taskType == null)
|
|
return await Task.FromResult(GetNextTaskRolling());
|
|
|
|
QueueTask? task = null;
|
|
lock (_lockObject)
|
|
{
|
|
if (_quesByType.TryGetValue(taskType, out var que))
|
|
{
|
|
que.TryDequeue(out task);
|
|
}
|
|
}
|
|
|
|
|
|
return await Task.FromResult(task);
|
|
}
|
|
|
|
public void SaveTask(QueueTask task, IDbTransaction tx)
|
|
{
|
|
|
|
}
|
|
|
|
public async Task SaveTaskAsync(QueueTask task, IDbTransaction tx)
|
|
{
|
|
await Task.CompletedTask;
|
|
}
|
|
} |