122 lines
3.8 KiB
C#
122 lines
3.8 KiB
C#
using System.Data;
|
|
using Dapper;
|
|
using InServiceQue.Core.Models;
|
|
using InServiceQue.Core.Repositories;
|
|
using Npgsql;
|
|
|
|
namespace InServiceQue.Postgres;
|
|
|
|
public class TaskRepository: ITaskRepository
|
|
{
|
|
private readonly NpgsqlConnection _connection;
|
|
|
|
public TaskRepository(string connectionString)
|
|
{
|
|
_connection = new NpgsqlConnection(connectionString);
|
|
}
|
|
|
|
public void Insert(QueueTask task)
|
|
{
|
|
var sql =
|
|
$"INSERT INTO QUEUE({nameof(QueueTask.Id)}, {nameof(QueueTask.TaskType)}, {nameof(QueueTask.DateCreated)}, {nameof(QueueTask.Payload)}) VALUES (@{nameof(QueueTask.Id)}, @{nameof(QueueTask.TaskType)}, @{nameof(QueueTask.DateCreated)}, @{nameof(QueueTask.Payload)})";
|
|
|
|
_connection.Execute(sql, task);
|
|
}
|
|
|
|
public async Task InsertAsync(QueueTask task)
|
|
{
|
|
var sql =
|
|
$"INSERT INTO QUEUE({nameof(QueueTask.Id)}, {nameof(QueueTask.TaskType)}, {nameof(QueueTask.DateCreated)}, {nameof(QueueTask.Payload)}) VALUES (@{nameof(QueueTask.Id)}, @{nameof(QueueTask.TaskType)}, @{nameof(QueueTask.DateCreated)}, @{nameof(QueueTask.Payload)})";
|
|
|
|
await _connection.ExecuteAsync(sql, task);
|
|
}
|
|
|
|
public IDbTransaction StartTransaction()
|
|
{
|
|
return _connection.BeginTransaction();
|
|
}
|
|
|
|
public async Task<IDbTransaction> StartTransactionAsync()
|
|
{
|
|
return await _connection.BeginTransactionAsync();
|
|
}
|
|
|
|
public void CommitTransaction(IDbTransaction tx)
|
|
{
|
|
tx.Commit();
|
|
}
|
|
|
|
public async Task CommitTransactionAsync(IDbTransaction tx)
|
|
{
|
|
await ((NpgsqlTransaction)tx).CommitAsync();
|
|
}
|
|
|
|
public void RollbackTransaction(IDbTransaction tx)
|
|
{
|
|
tx.Rollback();
|
|
}
|
|
|
|
public async Task RollbackTransactionAsync(IDbTransaction tx)
|
|
{
|
|
await ((NpgsqlTransaction)tx).RollbackAsync();
|
|
}
|
|
|
|
public QueueTask? GetNextTask(IDbTransaction tx, string? taskType = null)
|
|
{
|
|
var sql = $@"select * from QUEUE where
|
|
order by {nameof(QueueTask.DateCreated)}
|
|
limit 1
|
|
for update skip locked";
|
|
|
|
return _connection.QuerySingleOrDefault<QueueTask>(sql, tx);
|
|
}
|
|
|
|
public async Task<QueueTask?> GetNextTaskAsync(IDbTransaction tx, string? taskType = null)
|
|
{
|
|
var sql = $@"select * from QUEUE
|
|
order by {nameof(QueueTask.DateCreated)}
|
|
limit 1
|
|
for update skip locked";
|
|
|
|
return await _connection.QuerySingleOrDefaultAsync<QueueTask>(sql, tx);
|
|
}
|
|
|
|
public void SaveTask(QueueTask task, IDbTransaction tx)
|
|
{
|
|
var sql = $@"UPDATE QUEUE
|
|
SET
|
|
{nameof(QueueTask.DateProcessed)} = @{nameof(QueueTask.DateProcessed)},
|
|
{nameof(QueueTask.DateClosed)} = @{nameof(QueueTask.DateClosed)},
|
|
{nameof(QueueTask.Attempts)} = @{nameof(QueueTask.Attempts)},
|
|
{nameof(QueueTask.Payload)} = @{nameof(QueueTask.Payload)}";
|
|
|
|
_connection.Execute(sql, task, tx);
|
|
}
|
|
|
|
public async Task SaveTaskAsync(QueueTask task, IDbTransaction tx)
|
|
{
|
|
var sql = $@"UPDATE QUEUE
|
|
SET {nameof(QueueTask.DateProcessed)} = @{nameof(QueueTask.DateProcessed)},
|
|
{nameof(QueueTask.DateClosed)} = @{nameof(QueueTask.DateClosed)},
|
|
{nameof(QueueTask.Attempts)} = @{nameof(QueueTask.Attempts)},
|
|
{nameof(QueueTask.Payload)} = @{nameof(QueueTask.Payload)}";
|
|
|
|
await _connection.ExecuteAsync(sql, task, tx);
|
|
}
|
|
|
|
private void ReleaseUnmanagedResources()
|
|
{
|
|
_connection.Dispose();
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
ReleaseUnmanagedResources();
|
|
GC.SuppressFinalize(this);
|
|
}
|
|
|
|
~TaskRepository()
|
|
{
|
|
ReleaseUnmanagedResources();
|
|
}
|
|
} |