Back to Blog
Step-Based State Machines in BullMQ Jobs for Resilient Workflows

Step-Based State Machines in BullMQ Jobs for Resilient Workflows

January 3, 2026
Stefan Mentović
bullmqstate-machinesworkflowstypescriptresilience

Build production-ready resumable workflows in BullMQ using step-based state machines. Learn how to handle long-running jobs that gracefully recover from failures.

#Step-Based State Machines in BullMQ Jobs for Resilient Workflows

Your BullMQ job is halfway through a complex, multi-stage data processing pipeline—generating reports, calling APIs, transforming data—when suddenly, an API timeout kills it. Now you have to start from scratch. Sound familiar?

Long-running jobs that fail mid-execution are one of the most frustrating challenges in distributed systems. Retrying from the beginning wastes resources, time, and money. But what if your jobs could remember where they left off and resume from the exact point of failure?

In this guide, we'll explore how to implement step-based state machines in BullMQ jobs to build resumable, resilient workflows that gracefully handle failures and retries without repeating work.

#The Problem: All-or-Nothing Job Execution

Traditional BullMQ jobs follow an all-or-nothing pattern:

async function processJob(job: Job) {
	// Step 1: Fetch data from external API
	const data = await fetchDataFromAPI(job.data.id);

	// Step 2: Transform data
	const transformed = await transformData(data);

	// Step 3: Save to database
	await saveToDatabase(transformed);

	// Step 4: Send notifications
	await sendNotifications(job.data.id);

	return { success: true };
}

If Step 3 fails after Step 1 and Step 2 succeed, BullMQ retries the entire job — repeating the expensive API call and transformation work unnecessarily.

The core issues:

  1. No state persistence: The job doesn't remember which steps completed
  2. Wasted resources: Successful steps repeat on retry
  3. Inconsistent state: Partial work might leave systems in an inconsistent state
  4. Poor observability: Hard to know exactly where failures occurred

#The Solution: Step-Based State Machines

A step-based state machine uses a step field in job data to track progress through a multi-stage workflow. Each step:

  1. Checks current state: "Where did we leave off?"
  2. Performs its work: Executes only the necessary logic
  3. Updates state: Marks completion and advances to the next step
  4. Handles failures gracefully: Retries resume from the failed step

This pattern leverages BullMQ's built-in features:

  • Job data persistence: job.data survives retries
  • WaitingChildrenError: Pause parent jobs while children execute
  • Child job dependencies: Coordinate multi-stage workflows
  • Retry mechanisms: Built-in exponential backoff

Reference the BullMQ job data documentation for details on job persistence and the WaitingChildrenError pattern for parent-child coordination.

#Implementing the Step Pattern

#Basic Step Structure

Here's a minimal step-based job processor:

import { Worker, Job } from 'bullmq';

interface JobData {
	userId: string;
	orderId: string;
	step?: 'validate' | 'process' | 'notify' | 'complete';
}

interface JobResult {
	success: boolean;
	step: string;
	data?: any;
}

async function processOrderJob(job: Job<JobData>): Promise<JobResult> {
	// Get current step (defaults to first step)
	const step = job.data.step || 'validate';

	try {
		// Execute logic based on current step
		switch (step) {
			case 'validate':
				await validateOrder(job);
				// Advance to next step
				await job.updateData({ ...job.data, step: 'process' });
				return processOrderJob(job); // Continue immediately

			case 'process':
				await processPayment(job);
				await job.updateData({ ...job.data, step: 'notify' });
				return processOrderJob(job);

			case 'notify':
				await sendConfirmationEmail(job);
				await job.updateData({ ...job.data, step: 'complete' });
				return processOrderJob(job);

			case 'complete':
				return {
					success: true,
					step: 'complete',
					data: { orderId: job.data.orderId },
				};

			default:
				throw new Error(`Unknown step: ${step}`);
		}
	} catch (error) {
		// Error bubbles up, BullMQ retries from current step
		throw error;
	}
}

// Helper functions
async function validateOrder(job: Job<JobData>) {
	await job.updateProgress(10);
	// Validate order data, check inventory, etc.
	const isValid = await checkInventory(job.data.orderId);
	if (!isValid) {
		throw new Error('Order validation failed');
	}
}

async function processPayment(job: Job<JobData>) {
	await job.updateProgress(50);
	// Process payment with external API
	await callPaymentAPI(job.data.orderId);
}

async function sendConfirmationEmail(job: Job<JobData>) {
	await job.updateProgress(90);
	// Send email notification
	await emailService.send(job.data.userId, 'Order confirmed');
}

Key benefits of this pattern:

  • Automatic resume: If processPayment fails, retry starts at process step
  • Progress tracking: Each step reports progress independently
  • State visibility: Job data shows exactly which step failed
  • Idempotency support: Steps can check completion state before executing

#Managing Child Jobs with WaitingChildrenError

For complex workflows with parallel operations, use child jobs and WaitingChildrenError to pause execution:

import { Worker, Job, WaitingChildrenError } from 'bullmq';
import { reportQueue, analysisQueue, notificationQueue } from './queues';

interface ReportJobData {
	reportId: string;
	dataSourceIds: string[];
	step?: 'fetch' | 'wait-analysis' | 'aggregate' | 'complete';
}

async function processReportJob(job: Job<ReportJobData>, token?: string): Promise<any> {
	const step = job.data.step || 'fetch';

	try {
		// ================================================
		// STEP 1: Fetch Data Sources
		// ================================================
		if (step === 'fetch') {
			await job.updateProgress(10);

			// Fetch data from multiple sources
			const dataResults = await Promise.all(job.data.dataSourceIds.map((id) => fetchDataSource(id)));

			// Store fetched data in job data for next step
			await job.updateData({
				...job.data,
				fetchedData: dataResults,
				step: 'wait-analysis',
			});

			// Spawn child jobs for parallel analysis
			const childJobPromises = dataResults.map((data, index) =>
				analysisQueue.add(
					`analysis-${index}`,
					{ data, reportId: job.data.reportId },
					{
						parent: {
							id: job.id!,
							queue: job.queueQualifiedName,
						},
					},
				),
			);

			await Promise.all(childJobPromises);

			// Pause this job until children complete
			const shouldWait = await job.moveToWaitingChildren(token ?? '');
			if (shouldWait) {
				throw new WaitingChildrenError();
			}
		}

		// ================================================
		// STEP 2: Aggregate Analysis Results
		// ================================================
		if (step === 'wait-analysis') {
			await job.updateProgress(60);

			// Collect results from child jobs
			const childrenResults = await job.getChildrenValues();
			const analysisResults = Object.values(childrenResults);

			// Aggregate and transform
			const aggregatedReport = await aggregateResults(analysisResults, job.data.reportId);

			// Save aggregated report
			await saveReport(job.data.reportId, aggregatedReport);

			await job.updateData({
				...job.data,
				step: 'complete',
			});

			return {
				success: true,
				reportId: job.data.reportId,
				analysisCount: analysisResults.length,
			};
		}

		return { success: false, error: 'Unknown step' };
	} catch (error) {
		// WaitingChildrenError is not a real error—it's a signal to pause
		if (error instanceof WaitingChildrenError) {
			throw error;
		}

		// All other errors trigger retry from current step
		throw error;
	}
}

Critical pattern notes:

  1. Update step BEFORE waiting: job.updateData() before moveToWaitingChildren()
  2. WaitingChildrenError is special: Don't catch it—let it bubble up to BullMQ
  3. Check existing children: Use job.getChildrenValues() to avoid re-spawning
  4. Token parameter: Pass token to moveToWaitingChildren() for proper locking

#Production-Ready Example: Multi-Stage Data Pipeline

Let's build a realistic data processing pipeline that:

  1. Validates input data
  2. Spawns parallel transformation jobs
  3. Waits for transformations to complete
  4. Aggregates results
  5. Publishes to downstream systems
import { Worker, Job, Queue, WaitingChildrenError } from 'bullmq';

// ================================================
// Job Data Interfaces
// ================================================

interface PipelineJobData {
	pipelineId: string;
	inputFiles: string[];
	outputDestination: string;
	step?: 'validate' | 'transform' | 'wait-transform' | 'aggregate' | 'publish' | 'complete';
	validatedData?: any[];
	aggregatedResult?: any;
}

interface TransformJobData {
	pipelineId: string;
	fileData: any;
	transformationType: string;
}

interface PipelineResult {
	success: boolean;
	pipelineId: string;
	processedFiles: number;
	outputLocation?: string;
	error?: string;
}

// ================================================
// Queue Setup
// ================================================

const redisConfig = {
	host: process.env.REDIS_HOST || 'localhost',
	port: parseInt(process.env.REDIS_PORT || '6379'),
};

const pipelineQueue = new Queue('data-pipeline', {
	connection: redisConfig,
});

const transformQueue = new Queue('data-transform', {
	connection: redisConfig,
});

// ================================================
// Main Pipeline Processor
// ================================================

async function processPipelineJob(job: Job<PipelineJobData>, token?: string): Promise<PipelineResult> {
	const step = job.data.step || 'validate';
	const startTime = Date.now();

	try {
		// ================================================
		// STEP 1: Validate Input Data
		// ================================================
		if (step === 'validate') {
			await job.updateProgress(10);
			await job.log(`Starting validation for ${job.data.inputFiles.length} files`);

			// Load and validate all input files
			const validationResults = await Promise.all(
				job.data.inputFiles.map(async (filePath) => {
					const data = await loadFile(filePath);
					const isValid = await validateDataSchema(data);

					if (!isValid) {
						throw new Error(`Invalid data schema in file: ${filePath}`);
					}

					return { filePath, data };
				}),
			);

			await job.log(`Validated ${validationResults.length} files successfully`);

			// Save validated data and advance to transform step
			await job.updateData({
				...job.data,
				validatedData: validationResults,
				step: 'transform',
			});

			// Continue to next step immediately (no waiting)
			return processPipelineJob(job, token);
		}

		// ================================================
		// STEP 2: Spawn Transformation Child Jobs
		// ================================================
		if (step === 'transform') {
			await job.updateProgress(30);

			const validatedData = job.data.validatedData;
			if (!validatedData || validatedData.length === 0) {
				throw new Error('No validated data found');
			}

			await job.log(`Spawning ${validatedData.length} transformation jobs`);

			// Check which transform jobs already exist (from previous retry)
			const existingChildren = await job.getChildrenValues();
			const existingTransforms = Object.keys(existingChildren);

			// Only spawn transform jobs that don't already exist
			const newTransformJobs = validatedData
				.filter((_, index) => {
					const jobName = `transform-${job.data.pipelineId}-${index}`;
					return !existingTransforms.includes(jobName);
				})
				.map((item, index) =>
					transformQueue.add(
						`transform-${job.data.pipelineId}-${index}`,
						{
							pipelineId: job.data.pipelineId,
							fileData: item.data,
							transformationType: 'standard',
						} as TransformJobData,
						{
							parent: {
								id: job.id!,
								queue: job.queueQualifiedName,
							},
							priority: 1,
						},
					),
				);

			if (newTransformJobs.length > 0) {
				await Promise.all(newTransformJobs);
				await job.log(`Spawned ${newTransformJobs.length} new transformation jobs`);
			} else {
				await job.log('All transformation jobs already exist');
			}

			// Update to next step BEFORE waiting
			await job.updateData({
				...job.data,
				step: 'wait-transform',
			});

			// Pause until all transform children complete
			const shouldWait = await job.moveToWaitingChildren(token ?? '');
			if (shouldWait) {
				throw new WaitingChildrenError();
			}

			// If we reach here, children completed synchronously (unlikely)
			await job.log('Transform jobs completed');
		}

		// ================================================
		// STEP 3: Aggregate Transformation Results
		// ================================================
		if (step === 'wait-transform') {
			await job.updateProgress(70);
			await job.log('Collecting transformation results');

			// Check if any child jobs failed
			await checkChildJobFailures(job);

			// Collect results from all transform jobs
			const childrenValues = await job.getChildrenValues();
			const transformResults = Object.values(childrenValues);

			await job.log(`Aggregating ${transformResults.length} transformation results`);

			// Aggregate and combine results
			const aggregatedResult = await aggregateTransformations(transformResults, job.data.pipelineId);

			// Save aggregated result to job data
			await job.updateData({
				...job.data,
				aggregatedResult,
				step: 'publish',
			});

			// Continue to publish step
			return processPipelineJob(job, token);
		}

		// ================================================
		// STEP 4: Publish Results
		// ================================================
		if (step === 'publish') {
			await job.updateProgress(90);
			await job.log('Publishing results to destination');

			const { aggregatedResult, outputDestination, pipelineId } = job.data;

			if (!aggregatedResult) {
				throw new Error('No aggregated result found');
			}

			// Publish to external system (S3, database, API, etc.)
			const outputLocation = await publishResults(aggregatedResult, outputDestination, pipelineId);

			await job.log(`Results published to ${outputLocation}`);

			// Update to final step
			await job.updateData({
				...job.data,
				step: 'complete',
			});

			return processPipelineJob(job, token);
		}

		// ================================================
		// STEP 5: Complete
		// ================================================
		if (step === 'complete') {
			await job.updateProgress(100);

			const duration = Date.now() - startTime;
			await job.log(`Pipeline completed in ${duration}ms`);

			return {
				success: true,
				pipelineId: job.data.pipelineId,
				processedFiles: job.data.validatedData?.length || 0,
				outputLocation: job.data.outputDestination,
			};
		}

		// Fallback for unknown step
		throw new Error(`Unknown step: ${step}`);
	} catch (error) {
		// Don't treat WaitingChildrenError as a real error
		if (error instanceof WaitingChildrenError) {
			await job.log('Job paused, waiting for child jobs to complete');
			throw error;
		}

		// Log actual errors
		const duration = Date.now() - startTime;
		await job.log(`Job failed at step '${step}' after ${duration}ms: ${error.message}`);

		// BullMQ will retry from current step
		throw error;
	}
}

// ================================================
// Transform Job Processor (Child Job)
// ================================================

async function processTransformJob(job: Job<TransformJobData>): Promise<any> {
	await job.updateProgress(0);

	const { fileData, transformationType } = job.data;

	try {
		// Simulate transformation work
		const transformed = await applyTransformation(fileData, transformationType);

		await job.updateProgress(100);

		return {
			success: true,
			data: transformed,
			transformationType,
		};
	} catch (error) {
		throw error;
	}
}

// ================================================
// Helper Functions
// ================================================

async function loadFile(filePath: string): Promise<any> {
	// Simulate file loading
	return { path: filePath, content: 'file data' };
}

async function validateDataSchema(data: any): Promise<boolean> {
	// Simulate schema validation
	return data && data.content;
}

async function aggregateTransformations(results: any[], pipelineId: string): Promise<any> {
	// Combine transformation results
	return {
		pipelineId,
		totalRecords: results.length,
		data: results.map((r) => r.data),
	};
}

async function publishResults(result: any, destination: string, pipelineId: string): Promise<string> {
	// Simulate publishing to external system
	const outputPath = `${destination}/${pipelineId}/output.json`;
	// await uploadToS3(outputPath, result);
	return outputPath;
}

async function applyTransformation(data: any, type: string): Promise<any> {
	// Simulate data transformation
	return { ...data, transformed: true, type };
}

async function checkChildJobFailures(job: Job): Promise<void> {
	// Get all child jobs
	const children = await job.getChildren();

	for (const child of children.completed || []) {
		const childJob = await Job.fromId(transformQueue, child.id);

		if (childJob?.isFailed()) {
			throw new Error(`Child job ${childJob.id} failed: ${childJob.failedReason}`);
		}
	}
}

// ================================================
// Worker Initialization
// ================================================

const pipelineWorker = new Worker('data-pipeline', processPipelineJob, {
	connection: redisConfig,
	concurrency: 2, // Process 2 pipelines concurrently
});

const transformWorker = new Worker('data-transform', processTransformJob, {
	connection: redisConfig,
	concurrency: 10, // Process 10 transforms concurrently
});

// Event handlers
pipelineWorker.on('completed', (job) => {
	console.log(`Pipeline ${job.id} completed`);
});

pipelineWorker.on('failed', (job, error) => {
	console.error(`Pipeline ${job?.id} failed at step ${job?.data.step}:`, error.message);
});

transformWorker.on('completed', (job) => {
	console.log(`Transform ${job.id} completed`);
});

transformWorker.on('failed', (job, error) => {
	console.error(`Transform ${job?.id} failed:`, error.message);
});

// ================================================
// Usage Example
// ================================================

async function startPipeline() {
	const job = await pipelineQueue.add('process-pipeline', {
		pipelineId: 'pipeline-123',
		inputFiles: ['/data/input1.json', '/data/input2.json', '/data/input3.json'],
		outputDestination: 's3://output-bucket',
	});

	console.log(`Started pipeline job ${job.id}`);
}

#Best Practices for Step-Based State Machines

#1. Always Update Step Before Pausing

// ✅ CORRECT: Update step first
await job.updateData({ ...job.data, step: 'next-step' });
const shouldWait = await job.moveToWaitingChildren(token ?? '');
if (shouldWait) {
	throw new WaitingChildrenError();
}

// ❌ WRONG: Update step after waiting (retry starts at wrong step)
const shouldWait = await job.moveToWaitingChildren(token ?? '');
if (shouldWait) {
	await job.updateData({ ...job.data, step: 'next-step' }); // This won't execute!
	throw new WaitingChildrenError();
}

#2. Check for Existing Children Before Spawning

Avoid duplicate child jobs on retry:

const existingChildren = await job.getChildrenValues();
const existingJobNames = Object.keys(existingChildren);

const newJobs = items
	.filter((item) => !existingJobNames.includes(`job-${item.id}`))
	.map((item) =>
		queue.add(`job-${item.id}`, item, {
			parent: { id: job.id!, queue: job.queueQualifiedName },
		}),
	);

if (newJobs.length > 0) {
	await Promise.all(newJobs);
}

#3. Store Intermediate Results in Job Data

Preserve work between steps:

// Save results after expensive operation
const processedData = await expensiveOperation(job.data.input);

await job.updateData({
	...job.data,
	processedData, // Available on retry
	step: 'next-step',
});

#4. Use Granular Progress Reporting

Update progress per step for better observability:

const progressByStep = {
	validate: 10,
	process: 50,
	notify: 90,
	complete: 100,
};

await job.updateProgress(progressByStep[step]);

#5. Implement Idempotent Steps

Steps should be safe to retry:

async function saveToDatabase(job: Job) {
	// Check if already saved
	const exists = await checkIfExists(job.data.id);
	if (exists) {
		// Skip save, already done
		return;
	}

	// Perform save
	await db.insert(job.data);
}

#6. Don't Catch WaitingChildrenError

Let BullMQ handle the special error:

try {
	// ... step logic ...

	if (shouldWait) {
		throw new WaitingChildrenError();
	}
} catch (error) {
	// ✅ CORRECT: Re-throw WaitingChildrenError
	if (error instanceof WaitingChildrenError) {
		throw error;
	}

	// Handle other errors
	throw error;
}

#7. Validate Child Job Success

Check child results before proceeding:

async function checkChildJobFailures(job: Job): Promise<void> {
	const children = await job.getChildren();

	for (const childRef of children.completed || []) {
		const childJob = await Job.fromId(queue, childRef.id);

		if (childJob?.isFailed()) {
			throw new Error(`Child job ${childJob.id} failed: ${childJob.failedReason}`);
		}
	}
}

#Monitoring and Observability

Track step transitions for debugging:

async function processWithLogging(job: Job) {
	const step = job.data.step || 'start';

	// Log step entry
	await job.log(`Entering step: ${step}`);

	try {
		// ... step logic ...

		await job.log(`Completed step: ${step}`);
	} catch (error) {
		await job.log(`Failed step: ${step} - ${error.message}`);
		throw error;
	}
}

Use structured logging for production:

import { createLogger } from 'winston';

const logger = createLogger({
	defaultMeta: {
		service: 'pipeline-worker',
		jobId: job.id,
		step: job.data.step,
	},
});

logger.info('Step completed', {
	duration: Date.now() - stepStartTime,
	recordsProcessed: results.length,
});

#Key Takeaways

  • Step pattern enables resumable workflows: Jobs remember progress and resume from failure points
  • WaitingChildrenError pauses execution: Use for coordinating parent-child job flows
  • Update step BEFORE waiting: Critical for correct retry behavior
  • Check existing children: Avoid duplicate work on retry
  • Store intermediate results: Preserve expensive computations in job data
  • Separate concerns by step: Each step has a single responsibility
  • Progress tracking per step: Better observability and user experience
  • Idempotent steps are safer: Design steps to handle repeated execution

Step-based state machines transform fragile, all-or-nothing jobs into resilient workflows that gracefully handle failures. This pattern is essential for production systems processing high-value, long-running operations where losing progress is unacceptable.

Ready to build fault-tolerant distributed systems? Start by refactoring your longest-running BullMQ jobs into discrete steps. Your future self (and your infrastructure costs) will thank you.

Enjoyed this article? Stay updated: