|
// spell-checker:ignore NpgsqlDbType,invocationdata,updatecount,createdat,statename,invoc,jobid,stateid,stateids |
|
// spell-checker:ignore jobparameter,jobqueue,timestamptz |
|
|
|
#pragma warning disable CA2100 // Review SQL queries for security vulnerabilities |
|
|
|
using System.Linq.Expressions; |
|
using System.Text.Json; |
|
|
|
using global::Hangfire.Common; |
|
using global::Hangfire.Storage; |
|
|
|
using Npgsql; |
|
|
|
using NpgsqlTypes; |
|
|
|
public static class HangfireRaw |
|
{ |
|
public static async Task EnqueueBatchAsync<T>( |
|
NpgsqlConnection connection, |
|
Expression<Func<T, Task>> methodCall, |
|
IReadOnlyList<object[]> jobParams, |
|
string queue, |
|
string schemaName, |
|
CancellationToken cancellation = default) |
|
{ |
|
// Inserting large numbers of jobs through the Hangfire infrastructure is painfully slow. |
|
// This one is hard to test, so you just need to take it on faith. |
|
var now = await StageJobDataAsync(connection, methodCall, jobParams, cancellation); |
|
|
|
// State data for Enqueued state (EnqueuedAt is Unix timestamp in milliseconds as string) |
|
var unixTimeMs = now.ToUnixTimeMilliseconds(); |
|
var stateData = new Dictionary<string, string> |
|
{ |
|
{ "Queue", queue }, |
|
{ "EnqueuedAt", $"{unixTimeMs}" }, |
|
}; |
|
|
|
var stateDataJson = JsonSerializer.Serialize(stateData); |
|
|
|
var cmd = connection.CreateCommand(); |
|
cmd.Parameters.Add(new NpgsqlParameter("stateCreated", NpgsqlDbType.TimestampTz) { Value = now }); |
|
cmd.Parameters.Add(new NpgsqlParameter("stateData", NpgsqlDbType.Jsonb) { Value = stateDataJson }); |
|
cmd.Parameters.Add(new NpgsqlParameter("queue", NpgsqlDbType.Text) { Value = queue }); |
|
|
|
cmd.CommandText = $""" |
|
CREATE TEMP TABLE t_job_stateids ON COMMIT DROP |
|
AS |
|
WITH inserted_jobs AS ( |
|
INSERT INTO "{schemaName}"."job" ("invocationdata", "arguments", "createdat", "updatecount") |
|
SELECT invocationdata, arguments, createdat, 0 |
|
FROM t_job_batch |
|
RETURNING "id" |
|
), |
|
inserted_states AS ( |
|
INSERT INTO "{schemaName}"."state" ("jobid", "name", "reason", "createdat", "data", "updatecount") |
|
SELECT "id", 'Enqueued', NULL, @stateCreated, @stateData::jsonb, 0 |
|
FROM inserted_jobs |
|
RETURNING "id" AS "stateid", "jobid" |
|
), |
|
inserted_params AS ( |
|
INSERT INTO "{schemaName}"."jobparameter" ("jobid", "name", "value", "updatecount") |
|
SELECT "jobid", param_name, param_value, 0 |
|
FROM inserted_states |
|
CROSS JOIN ( |
|
VALUES |
|
('CurrentCulture', '""'), |
|
('CurrentUICulture', '""') |
|
) AS params(param_name, param_value) |
|
RETURNING 1 |
|
), |
|
inserted_queue AS ( |
|
INSERT INTO "{schemaName}"."jobqueue" ("jobid", "queue", "updatecount") |
|
SELECT "jobid", @queue, 0 FROM inserted_states |
|
RETURNING 1 |
|
) |
|
SELECT "jobid", "stateid" FROM inserted_states; |
|
|
|
UPDATE "{schemaName}"."job" j |
|
SET "stateid" = t."stateid", |
|
"statename" = 'Enqueued' |
|
FROM t_job_stateids t |
|
WHERE j."id" = t."jobid"; |
|
"""; |
|
|
|
await cmd.ExecuteNonQueryAsync(cancellation); |
|
} |
|
|
|
private static async Task<DateTimeOffset> StageJobDataAsync<T>( |
|
NpgsqlConnection connection, |
|
Expression<Func<T, Task>> methodCall, |
|
IReadOnlyList<object[]> paramsList, |
|
CancellationToken cancellation) |
|
{ |
|
var now = DateTimeOffset.UtcNow; |
|
var job = Job.FromExpression(methodCall); |
|
var invocationData = InvocationData.SerializeJob(job); |
|
|
|
// Get the base invocation data and arguments |
|
var invocationDataJson = JsonSerializer.Serialize(invocationData); |
|
|
|
var method = job.Method; |
|
var parameters = method.GetParameters(); |
|
|
|
var createTableCmd = connection.CreateCommand(); |
|
createTableCmd.CommandText = """ |
|
CREATE TEMP TABLE t_job_batch ( |
|
invocationdata jsonb, |
|
arguments jsonb, |
|
createdat timestamptz |
|
) ON COMMIT DROP; |
|
"""; |
|
await createTableCmd.ExecuteNonQueryAsync(cancellation); |
|
|
|
using var writer = await connection.BeginBinaryImportAsync( |
|
"COPY t_job_batch (invocationdata, arguments, createdat) FROM STDIN (FORMAT BINARY)", |
|
cancellation); |
|
|
|
foreach (var jobParam in paramsList) |
|
{ |
|
// Serialize arguments for this job |
|
var serializedArgs = new string?[parameters.Length]; |
|
|
|
for (var j = 0; j < parameters.Length; j++) |
|
{ |
|
var paramType = parameters[j].ParameterType; |
|
var argValue = j < jobParam.Length ? jobParam[j] : null; |
|
|
|
serializedArgs[j] = (paramType, argValue) switch |
|
{ |
|
(var type, _) when type == typeof(CancellationToken) => null, |
|
(_, null) => null, |
|
(_, var value) => JsonSerializer.Serialize(value), |
|
}; |
|
} |
|
|
|
// Create the Arguments JSON string (double-serialized) |
|
var argumentsJson = JsonSerializer.Serialize(serializedArgs); |
|
|
|
// Update invocationData with the specific arguments for this job |
|
var specificInvocationData = JsonSerializer.Deserialize<Dictionary<string, object>>(invocationDataJson) |
|
?? throw new InvalidOperationException("Failed to deserialize invocation data JSON"); |
|
specificInvocationData["Arguments"] = argumentsJson; |
|
var specificInvocationDataJson = JsonSerializer.Serialize(specificInvocationData); |
|
|
|
// The arguments column should contain the deserialized array (not double-serialized) |
|
var argsForColumn = JsonSerializer.Serialize(serializedArgs); |
|
|
|
await writer.StartRowAsync(cancellation); |
|
await writer.WriteAsync(specificInvocationDataJson, NpgsqlDbType.Jsonb, cancellation); |
|
await writer.WriteAsync(argsForColumn, NpgsqlDbType.Jsonb, cancellation); |
|
await writer.WriteAsync(now, NpgsqlDbType.TimestampTz, cancellation); |
|
} |
|
|
|
await writer.CompleteAsync(cancellation); |
|
|
|
return now; |
|
} |
|
} |