Skip to content

Instantly share code, notes, and snippets.

@hackf5
Last active October 4, 2025 15:55
Show Gist options
  • Select an option

  • Save hackf5/8daabd4e387e5137f17904cc69de4d59 to your computer and use it in GitHub Desktop.

Select an option

Save hackf5/8daabd4e387e5137f17904cc69de4d59 to your computer and use it in GitHub Desktop.
Hangfire Batch Enqueue with PostgreSql storage

I was sick of waiting 10 minutes to push 20k jobs into Hangfire, so Claude and I wrote this.

Hangfire.Core 1.8.21 Hangfire.PostgreSql 1.20.12

Now the 20k jobs go in almost instantly.

FYI you need to run this inside a transaction so that the staging table is visible to the second command.

// spell-checker:ignore NpgsqlDbType,invocationdata,updatecount,createdat,statename,invoc,jobid,stateid,stateids
// spell-checker:ignore jobparameter,jobqueue,timestamptz
#pragma warning disable CA2100 // Review SQL queries for security vulnerabilities
using System.Linq.Expressions;
using System.Text.Json;
using global::Hangfire.Common;
using global::Hangfire.Storage;
using Npgsql;
using NpgsqlTypes;
public static class HangfireRaw
{
public static async Task EnqueueBatchAsync<T>(
NpgsqlConnection connection,
Expression<Func<T, Task>> methodCall,
IReadOnlyList<object[]> jobParams,
string queue,
string schemaName,
CancellationToken cancellation = default)
{
// Inserting large numbers of jobs through the Hangfire infrastructure is painfully slow.
// This one is hard to test, so you just need to take it on faith.
var now = await StageJobDataAsync(connection, methodCall, jobParams, cancellation);
// State data for Enqueued state (EnqueuedAt is Unix timestamp in milliseconds as string)
var unixTimeMs = now.ToUnixTimeMilliseconds();
var stateData = new Dictionary<string, string>
{
{ "Queue", queue },
{ "EnqueuedAt", $"{unixTimeMs}" },
};
var stateDataJson = JsonSerializer.Serialize(stateData);
var cmd = connection.CreateCommand();
cmd.Parameters.Add(new NpgsqlParameter("stateCreated", NpgsqlDbType.TimestampTz) { Value = now });
cmd.Parameters.Add(new NpgsqlParameter("stateData", NpgsqlDbType.Jsonb) { Value = stateDataJson });
cmd.Parameters.Add(new NpgsqlParameter("queue", NpgsqlDbType.Text) { Value = queue });
cmd.CommandText = $"""
CREATE TEMP TABLE t_job_stateids ON COMMIT DROP
AS
WITH inserted_jobs AS (
INSERT INTO "{schemaName}"."job" ("invocationdata", "arguments", "createdat", "updatecount")
SELECT invocationdata, arguments, createdat, 0
FROM t_job_batch
RETURNING "id"
),
inserted_states AS (
INSERT INTO "{schemaName}"."state" ("jobid", "name", "reason", "createdat", "data", "updatecount")
SELECT "id", 'Enqueued', NULL, @stateCreated, @stateData::jsonb, 0
FROM inserted_jobs
RETURNING "id" AS "stateid", "jobid"
),
inserted_params AS (
INSERT INTO "{schemaName}"."jobparameter" ("jobid", "name", "value", "updatecount")
SELECT "jobid", param_name, param_value, 0
FROM inserted_states
CROSS JOIN (
VALUES
('CurrentCulture', '""'),
('CurrentUICulture', '""')
) AS params(param_name, param_value)
RETURNING 1
),
inserted_queue AS (
INSERT INTO "{schemaName}"."jobqueue" ("jobid", "queue", "updatecount")
SELECT "jobid", @queue, 0 FROM inserted_states
RETURNING 1
)
SELECT "jobid", "stateid" FROM inserted_states;
UPDATE "{schemaName}"."job" j
SET "stateid" = t."stateid",
"statename" = 'Enqueued'
FROM t_job_stateids t
WHERE j."id" = t."jobid";
""";
await cmd.ExecuteNonQueryAsync(cancellation);
}
private static async Task<DateTimeOffset> StageJobDataAsync<T>(
NpgsqlConnection connection,
Expression<Func<T, Task>> methodCall,
IReadOnlyList<object[]> paramsList,
CancellationToken cancellation)
{
var now = DateTimeOffset.UtcNow;
var job = Job.FromExpression(methodCall);
var invocationData = InvocationData.SerializeJob(job);
// Get the base invocation data and arguments
var invocationDataJson = JsonSerializer.Serialize(invocationData);
var method = job.Method;
var parameters = method.GetParameters();
var createTableCmd = connection.CreateCommand();
createTableCmd.CommandText = """
CREATE TEMP TABLE t_job_batch (
invocationdata jsonb,
arguments jsonb,
createdat timestamptz
) ON COMMIT DROP;
""";
await createTableCmd.ExecuteNonQueryAsync(cancellation);
using var writer = await connection.BeginBinaryImportAsync(
"COPY t_job_batch (invocationdata, arguments, createdat) FROM STDIN (FORMAT BINARY)",
cancellation);
foreach (var jobParam in paramsList)
{
// Serialize arguments for this job
var serializedArgs = new string?[parameters.Length];
for (var j = 0; j < parameters.Length; j++)
{
var paramType = parameters[j].ParameterType;
var argValue = j < jobParam.Length ? jobParam[j] : null;
serializedArgs[j] = (paramType, argValue) switch
{
(var type, _) when type == typeof(CancellationToken) => null,
(_, null) => null,
(_, var value) => JsonSerializer.Serialize(value),
};
}
// Create the Arguments JSON string (double-serialized)
var argumentsJson = JsonSerializer.Serialize(serializedArgs);
// Update invocationData with the specific arguments for this job
var specificInvocationData = JsonSerializer.Deserialize<Dictionary<string, object>>(invocationDataJson)
?? throw new InvalidOperationException("Failed to deserialize invocation data JSON");
specificInvocationData["Arguments"] = argumentsJson;
var specificInvocationDataJson = JsonSerializer.Serialize(specificInvocationData);
// The arguments column should contain the deserialized array (not double-serialized)
var argsForColumn = JsonSerializer.Serialize(serializedArgs);
await writer.StartRowAsync(cancellation);
await writer.WriteAsync(specificInvocationDataJson, NpgsqlDbType.Jsonb, cancellation);
await writer.WriteAsync(argsForColumn, NpgsqlDbType.Jsonb, cancellation);
await writer.WriteAsync(now, NpgsqlDbType.TimestampTz, cancellation);
}
await writer.CompleteAsync(cancellation);
return now;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment