Created
January 21, 2026 12:18
-
-
Save iampato/f0e3ca44a3fde8b9eb53842e16f6157a to your computer and use it in GitHub Desktop.
Use BullMQ flows with reversals
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { Injectable, Logger } from '@nestjs/common'; | |
| import { FlowProducer, FlowJob } from 'bullmq'; | |
| import { InjectQueue } from '@nestjs/bullmq'; | |
| import { Queue } from 'bullmq'; | |
| import { QUEUE_NAMES } from '../constants/queue.constants'; | |
| export interface WithdrawRequest { | |
| userId: string; | |
| amount: number; | |
| paymentMethodId: string; | |
| transactionId: string; | |
| } | |
| @Injectable() | |
| export class WithdrawFlowWorkflow { | |
| private readonly logger = new Logger(WithdrawFlowWorkflow.name); | |
| private flowProducer: FlowProducer; | |
| constructor( | |
| @InjectQueue(QUEUE_NAMES.PAYMENT_PROCESSING) | |
| private paymentQueue: Queue, | |
| ) { | |
| // Initialize FlowProducer with the same Redis connection | |
| this.flowProducer = new FlowProducer({ | |
| connection: this.paymentQueue.opts.connection, | |
| }); | |
| } | |
| async processWithdraw(withdrawRequest: WithdrawRequest): Promise<string> { | |
| const { userId, amount, transactionId } = withdrawRequest; | |
| // Define the main flow with compensation actions | |
| const flow: FlowJob = { | |
| name: 'withdraw-flow', | |
| queueName: QUEUE_NAMES.PAYMENT_PROCESSING, | |
| data: withdrawRequest, | |
| children: [ | |
| // Step 1: Validate user and check balance | |
| { | |
| name: 'validate-withdraw', | |
| queueName: QUEUE_NAMES.PAYMENT_PROCESSING, | |
| data: { userId, amount, transactionId }, | |
| opts: { | |
| attempts: 3, | |
| backoff: 'exponential', | |
| }, | |
| }, | |
| // Step 2: Deduct working balance (with compensation) | |
| { | |
| name: 'deduct-working-balance', | |
| queueName: QUEUE_NAMES.PAYMENT_PROCESSING, | |
| data: { userId, amount, transactionId }, | |
| opts: { | |
| attempts: 3, | |
| backoff: 'exponential', | |
| }, | |
| // Compensation job for rollback | |
| children: [ | |
| { | |
| name: 'compensate-balance-deduction', | |
| queueName: QUEUE_NAMES.PAYMENT_PROCESSING, | |
| data: { userId, amount, transactionId, action: 'restore' }, | |
| opts: { | |
| attempts: 5, | |
| delay: 1000, | |
| }, | |
| }, | |
| ], | |
| }, | |
| // Step 3: Create withdrawal record | |
| { | |
| name: 'create-withdrawal-record', | |
| queueName: QUEUE_NAMES.PAYMENT_PROCESSING, | |
| data: { userId, amount, transactionId }, | |
| opts: { | |
| attempts: 3, | |
| }, | |
| children: [ | |
| { | |
| name: 'compensate-withdrawal-record', | |
| queueName: QUEUE_NAMES.PAYMENT_PROCESSING, | |
| data: { userId, transactionId, action: 'cancel' }, | |
| opts: { | |
| attempts: 5, | |
| }, | |
| }, | |
| ], | |
| }, | |
| // Step 4: Process with PSP (most likely to fail) | |
| { | |
| name: 'process-psp-withdrawal', | |
| queueName: QUEUE_NAMES.PAYMENT_PROCESSING, | |
| data: withdrawRequest, | |
| opts: { | |
| attempts: 3, | |
| backoff: { | |
| type: 'exponential', | |
| delay: 5000, | |
| }, | |
| // Add timeout for PSP calls | |
| jobId: `psp-withdraw-${transactionId}`, | |
| }, | |
| }, | |
| // Step 5: Confirm withdrawal (only if PSP succeeds) | |
| { | |
| name: 'confirm-withdrawal', | |
| queueName: QUEUE_NAMES.PAYMENT_PROCESSING, | |
| data: { userId, amount, transactionId }, | |
| opts: { | |
| attempts: 3, | |
| }, | |
| }, | |
| ], | |
| opts: { | |
| // Flow-level configuration | |
| attempts: 1, | |
| removeOnComplete: 10, | |
| removeOnFail: 50, | |
| jobId: `withdraw-flow-${transactionId}`, | |
| }, | |
| }; | |
| try { | |
| const flowJob = await this.flowProducer.add(flow); | |
| this.logger.log(`Withdraw flow started: ${flowJob.id} for transaction ${transactionId}`); | |
| return flowJob.id; | |
| } catch (error) { | |
| this.logger.error(`Failed to start withdraw flow: ${error.message}`); | |
| throw error; | |
| } | |
| } | |
| // Manual compensation trigger for complex scenarios | |
| async triggerCompensation(transactionId: string, failedStep: string): Promise<void> { | |
| this.logger.warn(`Triggering compensation for transaction ${transactionId} at step ${failedStep}`); | |
| const compensationFlow: FlowJob = { | |
| name: 'withdraw-compensation-flow', | |
| queueName: QUEUE_NAMES.PAYMENT_PROCESSING, | |
| data: { transactionId, failedStep, action: 'compensate' }, | |
| children: [ | |
| // Reverse balance deduction | |
| { | |
| name: 'restore-balance', | |
| queueName: QUEUE_NAMES.PAYMENT_PROCESSING, | |
| data: { transactionId, action: 'restore' }, | |
| opts: { attempts: 5 }, | |
| }, | |
| // Cancel withdrawal record | |
| { | |
| name: 'cancel-withdrawal', | |
| queueName: QUEUE_NAMES.PAYMENT_PROCESSING, | |
| data: { transactionId, action: 'cancel' }, | |
| opts: { attempts: 5 }, | |
| }, | |
| // Notify user of failure | |
| { | |
| name: 'notify-withdrawal-failed', | |
| queueName: QUEUE_NAMES.NOTIFICATION, | |
| data: { transactionId, reason: `Failed at ${failedStep}` }, | |
| opts: { attempts: 3 }, | |
| }, | |
| ], | |
| opts: { | |
| jobId: `compensation-${transactionId}`, | |
| }, | |
| }; | |
| await this.flowProducer.add(compensationFlow); | |
| } | |
| // Get flow status for monitoring | |
| async getFlowStatus(flowId: string) { | |
| // You can implement flow status checking here | |
| // BullMQ doesn't have built-in flow status, but you can track it | |
| return { | |
| flowId, | |
| status: 'in_progress', // You'd implement this logic | |
| currentStep: 'processing', | |
| failedStep: null, | |
| }; | |
| } | |
| async onModuleDestroy() { | |
| await this.flowProducer.close(); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment