Files
InServiceQue/InServiceQue.Postgres/TaskRepository.cs
2024-03-10 00:43:57 +07:00

122 lines
3.8 KiB
C#

using System.Data;
using Dapper;
using InServiceQue.Core.Models;
using InServiceQue.Core.Repositories;
using Npgsql;
namespace InServiceQue.Postgres;
public class TaskRepository: ITaskRepository
{
private readonly NpgsqlConnection _connection;
public TaskRepository(string connectionString)
{
_connection = new NpgsqlConnection(connectionString);
}
public void Insert(QueueTask task)
{
var sql =
$"INSERT INTO QUEUE({nameof(QueueTask.Id)}, {nameof(QueueTask.TaskType)}, {nameof(QueueTask.DateCreated)}, {nameof(QueueTask.Payload)}) VALUES (@{nameof(QueueTask.Id)}, @{nameof(QueueTask.TaskType)}, @{nameof(QueueTask.DateCreated)}, @{nameof(QueueTask.Payload)})";
_connection.Execute(sql, task);
}
public async Task InsertAsync(QueueTask task)
{
var sql =
$"INSERT INTO QUEUE({nameof(QueueTask.Id)}, {nameof(QueueTask.TaskType)}, {nameof(QueueTask.DateCreated)}, {nameof(QueueTask.Payload)}) VALUES (@{nameof(QueueTask.Id)}, @{nameof(QueueTask.TaskType)}, @{nameof(QueueTask.DateCreated)}, @{nameof(QueueTask.Payload)})";
await _connection.ExecuteAsync(sql, task);
}
public IDbTransaction StartTransaction()
{
return _connection.BeginTransaction();
}
public async Task<IDbTransaction> StartTransactionAsync()
{
return await _connection.BeginTransactionAsync();
}
public void CommitTransaction(IDbTransaction tx)
{
tx.Commit();
}
public async Task CommitTransactionAsync(IDbTransaction tx)
{
await ((NpgsqlTransaction)tx).CommitAsync();
}
public void RollbackTransaction(IDbTransaction tx)
{
tx.Rollback();
}
public async Task RollbackTransactionAsync(IDbTransaction tx)
{
await ((NpgsqlTransaction)tx).RollbackAsync();
}
public QueueTask? GetNextTask(IDbTransaction tx)
{
var sql = $@"select * from QUEUE where
order by {nameof(QueueTask.DateCreated)}
limit 1
for update skip locked";
return _connection.QuerySingleOrDefault<QueueTask>(sql, tx);
}
public async Task<QueueTask?> GetNextTaskAsync(IDbTransaction tx)
{
var sql = $@"select * from QUEUE
order by {nameof(QueueTask.DateCreated)}
limit 1
for update skip locked";
return await _connection.QuerySingleOrDefaultAsync<QueueTask>(sql, tx);
}
public void SaveTask(QueueTask task, IDbTransaction tx)
{
var sql = $@"UPDATE QUEUE
SET
{nameof(QueueTask.DateProcessed)} = @{nameof(QueueTask.DateProcessed)},
{nameof(QueueTask.DateClosed)} = @{nameof(QueueTask.DateClosed)},
{nameof(QueueTask.Attempts)} = @{nameof(QueueTask.Attempts)},
{nameof(QueueTask.Payload)} = @{nameof(QueueTask.Payload)}";
_connection.Execute(sql, task, tx);
}
public async Task SaveTaskAsync(QueueTask task, IDbTransaction tx)
{
var sql = $@"UPDATE QUEUE
SET {nameof(QueueTask.DateProcessed)} = @{nameof(QueueTask.DateProcessed)},
{nameof(QueueTask.DateClosed)} = @{nameof(QueueTask.DateClosed)},
{nameof(QueueTask.Attempts)} = @{nameof(QueueTask.Attempts)},
{nameof(QueueTask.Payload)} = @{nameof(QueueTask.Payload)}";
await _connection.ExecuteAsync(sql, task, tx);
}
private void ReleaseUnmanagedResources()
{
_connection.Dispose();
}
public void Dispose()
{
ReleaseUnmanagedResources();
GC.SuppressFinalize(this);
}
~TaskRepository()
{
ReleaseUnmanagedResources();
}
}