Skip to content

Instantly share code, notes, and snippets.

@iampato
Created January 21, 2026 12:18
Show Gist options
  • Select an option

  • Save iampato/f0e3ca44a3fde8b9eb53842e16f6157a to your computer and use it in GitHub Desktop.

Select an option

Save iampato/f0e3ca44a3fde8b9eb53842e16f6157a to your computer and use it in GitHub Desktop.
Use BullMQ flows with reversals
import { Injectable, Logger } from '@nestjs/common';
import { FlowProducer, FlowJob } from 'bullmq';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
import { QUEUE_NAMES } from '../constants/queue.constants';
export interface WithdrawRequest {
userId: string;
amount: number;
paymentMethodId: string;
transactionId: string;
}
@Injectable()
export class WithdrawFlowWorkflow {
private readonly logger = new Logger(WithdrawFlowWorkflow.name);
private flowProducer: FlowProducer;
constructor(
@InjectQueue(QUEUE_NAMES.PAYMENT_PROCESSING)
private paymentQueue: Queue,
) {
// Initialize FlowProducer with the same Redis connection
this.flowProducer = new FlowProducer({
connection: this.paymentQueue.opts.connection,
});
}
async processWithdraw(withdrawRequest: WithdrawRequest): Promise<string> {
const { userId, amount, transactionId } = withdrawRequest;
// Define the main flow with compensation actions
const flow: FlowJob = {
name: 'withdraw-flow',
queueName: QUEUE_NAMES.PAYMENT_PROCESSING,
data: withdrawRequest,
children: [
// Step 1: Validate user and check balance
{
name: 'validate-withdraw',
queueName: QUEUE_NAMES.PAYMENT_PROCESSING,
data: { userId, amount, transactionId },
opts: {
attempts: 3,
backoff: 'exponential',
},
},
// Step 2: Deduct working balance (with compensation)
{
name: 'deduct-working-balance',
queueName: QUEUE_NAMES.PAYMENT_PROCESSING,
data: { userId, amount, transactionId },
opts: {
attempts: 3,
backoff: 'exponential',
},
// Compensation job for rollback
children: [
{
name: 'compensate-balance-deduction',
queueName: QUEUE_NAMES.PAYMENT_PROCESSING,
data: { userId, amount, transactionId, action: 'restore' },
opts: {
attempts: 5,
delay: 1000,
},
},
],
},
// Step 3: Create withdrawal record
{
name: 'create-withdrawal-record',
queueName: QUEUE_NAMES.PAYMENT_PROCESSING,
data: { userId, amount, transactionId },
opts: {
attempts: 3,
},
children: [
{
name: 'compensate-withdrawal-record',
queueName: QUEUE_NAMES.PAYMENT_PROCESSING,
data: { userId, transactionId, action: 'cancel' },
opts: {
attempts: 5,
},
},
],
},
// Step 4: Process with PSP (most likely to fail)
{
name: 'process-psp-withdrawal',
queueName: QUEUE_NAMES.PAYMENT_PROCESSING,
data: withdrawRequest,
opts: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 5000,
},
// Add timeout for PSP calls
jobId: `psp-withdraw-${transactionId}`,
},
},
// Step 5: Confirm withdrawal (only if PSP succeeds)
{
name: 'confirm-withdrawal',
queueName: QUEUE_NAMES.PAYMENT_PROCESSING,
data: { userId, amount, transactionId },
opts: {
attempts: 3,
},
},
],
opts: {
// Flow-level configuration
attempts: 1,
removeOnComplete: 10,
removeOnFail: 50,
jobId: `withdraw-flow-${transactionId}`,
},
};
try {
const flowJob = await this.flowProducer.add(flow);
this.logger.log(`Withdraw flow started: ${flowJob.id} for transaction ${transactionId}`);
return flowJob.id;
} catch (error) {
this.logger.error(`Failed to start withdraw flow: ${error.message}`);
throw error;
}
}
// Manual compensation trigger for complex scenarios
async triggerCompensation(transactionId: string, failedStep: string): Promise<void> {
this.logger.warn(`Triggering compensation for transaction ${transactionId} at step ${failedStep}`);
const compensationFlow: FlowJob = {
name: 'withdraw-compensation-flow',
queueName: QUEUE_NAMES.PAYMENT_PROCESSING,
data: { transactionId, failedStep, action: 'compensate' },
children: [
// Reverse balance deduction
{
name: 'restore-balance',
queueName: QUEUE_NAMES.PAYMENT_PROCESSING,
data: { transactionId, action: 'restore' },
opts: { attempts: 5 },
},
// Cancel withdrawal record
{
name: 'cancel-withdrawal',
queueName: QUEUE_NAMES.PAYMENT_PROCESSING,
data: { transactionId, action: 'cancel' },
opts: { attempts: 5 },
},
// Notify user of failure
{
name: 'notify-withdrawal-failed',
queueName: QUEUE_NAMES.NOTIFICATION,
data: { transactionId, reason: `Failed at ${failedStep}` },
opts: { attempts: 3 },
},
],
opts: {
jobId: `compensation-${transactionId}`,
},
};
await this.flowProducer.add(compensationFlow);
}
// Get flow status for monitoring
async getFlowStatus(flowId: string) {
// You can implement flow status checking here
// BullMQ doesn't have built-in flow status, but you can track it
return {
flowId,
status: 'in_progress', // You'd implement this logic
currentStep: 'processing',
failedStep: null,
};
}
async onModuleDestroy() {
await this.flowProducer.close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment