
Multi-LLM Orchestration: Running 5 AI Providers in Parallel with BullMQ
Build fault-tolerant AI pipelines that query OpenAI, Anthropic, Gemini, Perplexity, and DeepSeek simultaneously. Handle partial failures gracefully.
Multi-LLM Orchestration: Running 5 AI Providers in Parallel with BullMQ
You need to analyze a product for market research. Each AI provider brings different strengths: Perplexity excels at real-time web search, Claude at nuanced reasoning, Gemini at grounded facts, DeepSeek at cost-effective analysis, and OpenAI at general-purpose tasks. Why choose one when you can use them all?
The challenge isn't calling five APIs — it's handling the inevitable failures. Rate limits, capacity errors, network timeouts. When you're orchestrating multiple LLMs, partial failures are guaranteed. The question is whether your system degrades gracefully or crashes entirely.
This guide shows how to build a fault-tolerant multi-LLM pipeline using BullMQ's parent-child job pattern. We'll query all five providers in parallel, handle individual failures without losing successful results, and aggregate responses for downstream processing.
The Use Case: Competitive Analysis Report
Imagine building a tool that generates competitive analysis reports. For each product, you want perspectives from multiple AI providers:
- OpenAI GPT-4o: General market positioning analysis
- Anthropic Claude: Deep reasoning about competitive advantages
- Google Gemini: Web-grounded factual data with citations
- Perplexity: Real-time competitive intelligence with sources
- DeepSeek: Cost-effective supplementary analysis
Each provider might take 5-30 seconds to respond. Some might fail due to rate limits or capacity issues. You need results from at least 3 providers to generate a useful report, but you want all 5 if possible.
Architecture: Parent-Child Job Pattern
BullMQ's parent-child pattern is perfect for this. The orchestrator (parent) spawns individual LLM jobs (children), waits for them to complete, and aggregates results — even when some children fail.
orchestrator-job (parent)
├── openai-query-job (child)
├── anthropic-query-job (child)
├── gemini-query-job (child)
├── perplexity-query-job (child)
└── deepseek-query-job (child)
Key features:
- Parallel execution: All 5 LLM jobs run simultaneously
- Independent retries: Each child retries on its own schedule
- Partial success: Parent continues even if some children fail
- Result preservation: Successful results survive sibling failures
Setting Up the Queues
First, define queues for each LLM provider with appropriate retry strategies:
// queues/llm-queues.ts
import { Queue } from 'bullmq';
import Redis from 'ioredis';
const connection = new Redis(process.env.REDIS_URL);
// Shared retry configuration for LLM jobs
const llmJobOptions = {
attempts: 5,
backoff: {
type: 'exponential',
delay: 60_000, // 1 minute base delay
},
removeOnComplete: 100,
removeOnFail: 50,
};
export interface LLMQueryJobData {
analysisId: string;
productName: string;
productDescription: string;
competitors: string[];
prompt: string;
}
export interface LLMQueryResult {
success: boolean;
provider: 'openai' | 'anthropic' | 'gemini' | 'perplexity' | 'deepseek';
content: string;
tokens: { input: number; output: number };
error?: string;
}
// Parent orchestrator queue
export const orchestratorQueue = new Queue('llm-orchestrator', {
connection,
defaultJobOptions: llmJobOptions,
});
// Child queues for each provider
export const openaiQueue = new Queue<LLMQueryJobData>('openai-query', {
connection,
defaultJobOptions: llmJobOptions,
});
export const anthropicQueue = new Queue<LLMQueryJobData>('anthropic-query', {
connection,
defaultJobOptions: llmJobOptions,
});
export const geminiQueue = new Queue<LLMQueryJobData>('gemini-query', {
connection,
defaultJobOptions: llmJobOptions,
});
export const perplexityQueue = new Queue<LLMQueryJobData>('perplexity-query', {
connection,
defaultJobOptions: llmJobOptions,
});
export const deepseekQueue = new Queue<LLMQueryJobData>('deepseek-query', {
connection,
defaultJobOptions: llmJobOptions,
});
SDK Configuration: 5 Providers, 4 Patterns
Before diving into workers, let's understand how each provider's SDK works. The good news: three of them use OpenAI-compatible APIs, reducing the complexity significantly.
Pattern 1: Native OpenAI SDK
OpenAI uses their own SDK directly. See the official OpenAI Node.js SDK documentation.
// npm install openai
import OpenAI from 'openai';
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
});
const response = await openai.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: prompt }],
temperature: 0.3,
});
// Extract response
const content = response.choices[0]?.message?.content || '';
const tokens = {
input: response.usage?.prompt_tokens || 0,
output: response.usage?.completion_tokens || 0,
};
Pattern 2: OpenAI SDK with Custom Base URL
Both Perplexity and DeepSeek provide OpenAI-compatible APIs. You use the same openai package but point it to their servers.
Perplexity (OpenAI Compatibility Guide):
import OpenAI from 'openai';
const perplexity = new OpenAI({
apiKey: process.env.PERPLEXITY_API_KEY,
baseURL: 'https://api.perplexity.ai',
});
const response = await perplexity.chat.completions.create({
model: 'sonar-pro', // Perplexity's search-enhanced model
messages: [{ role: 'user', content: prompt }],
temperature: 0.2,
});
// Perplexity adds citations to the response (not in OpenAI types)
const perplexityResponse = response as typeof response & { citations?: string[] };
const citationsCount = perplexityResponse.citations?.length || 0;
DeepSeek (API Documentation):
import OpenAI from 'openai';
const deepseek = new OpenAI({
apiKey: process.env.DEEPSEEK_API_KEY,
baseURL: 'https://api.deepseek.com', // or https://api.deepseek.com/v1
});
const response = await deepseek.chat.completions.create({
model: 'deepseek-chat', // DeepSeek-V3.2 (non-thinking mode)
// model: 'deepseek-reasoner', // DeepSeek-V3.2 (thinking mode)
messages: [{ role: 'user', content: prompt }],
temperature: 0.3,
});
Cost advantage: DeepSeek offers competitive pricing compared to GPT-4o while maintaining strong performance. Perplexity's
sonar-proincludes real-time web search built-in.
Pattern 3: Anthropic SDK
Anthropic has their own SDK with a different response structure. See the official Anthropic TypeScript SDK.
// npm install @anthropic-ai/sdk
import Anthropic from '@anthropic-ai/sdk';
const anthropic = new Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});
const response = await anthropic.messages.create({
model: 'claude-sonnet-4-5-20250929',
max_tokens: 4096,
messages: [{ role: 'user', content: prompt }],
});
// Anthropic returns content as an array of blocks
const textBlock = response.content.find((block) => block.type === 'text');
const content = textBlock?.text || '';
const tokens = {
input: response.usage.input_tokens,
output: response.usage.output_tokens,
};
Pattern 4: Google GenAI SDK
Google's Gemini uses their own SDK. See the official Google GenAI SDK.
// npm install @google/genai
import { GoogleGenAI } from '@google/genai';
const gemini = new GoogleGenAI({ apiKey: process.env.GEMINI_API_KEY });
const response = await gemini.models.generateContent({
model: 'gemini-2.0-flash',
contents: prompt,
config: {
tools: [{ googleSearch: {} }], // Enable web grounding (optional)
temperature: 0.3,
},
});
const content = response.text || '';
const tokens = {
input: response.usageMetadata?.promptTokenCount || 0,
output: response.usageMetadata?.candidatesTokenCount || 0,
};
The Generic LLM Worker Pattern
Instead of duplicating worker code for each provider, create a generic worker factory:
// workers/create-llm-worker.ts
import { Worker, Job } from 'bullmq';
import type { LLMQueryJobData, LLMQueryResult, LLMProvider } from '../queues/llm-queues';
type QueryFunction = (prompt: string) => Promise<{ content: string; tokens: { input: number; output: number } }>;
export function createLLMWorker(
queueName: string,
provider: LLMProvider,
queryFn: QueryFunction,
options: { concurrency: number }
) {
return new Worker<LLMQueryJobData, LLMQueryResult>(
queueName,
async (job: Job<LLMQueryJobData>): Promise<LLMQueryResult> => {
const { prompt } = job.data;
try {
const { content, tokens } = await queryFn(prompt);
return {
success: true,
provider,
content,
tokens,
};
} catch (error) {
// Let BullMQ handle retry - throw the error
throw error;
}
},
{
connection,
concurrency: options.concurrency,
}
);
}
Now instantiate workers for each provider:
// workers/index.ts
import OpenAI from 'openai';
import Anthropic from '@anthropic-ai/sdk';
import { GoogleGenAI } from '@google/genai';
import { createLLMWorker } from './create-llm-worker';
// Initialize SDK clients
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
const anthropic = new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY });
const gemini = new GoogleGenAI({ apiKey: process.env.GEMINI_API_KEY });
const perplexity = new OpenAI({ apiKey: process.env.PERPLEXITY_API_KEY, baseURL: 'https://api.perplexity.ai' });
const deepseek = new OpenAI({ apiKey: process.env.DEEPSEEK_API_KEY, baseURL: 'https://api.deepseek.com' });
// OpenAI Worker
export const openaiWorker = createLLMWorker(
'openai-query',
'openai',
async (prompt) => {
const response = await openai.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: prompt }],
temperature: 0.3,
});
return {
content: response.choices[0]?.message?.content || '',
tokens: { input: response.usage?.prompt_tokens || 0, output: response.usage?.completion_tokens || 0 },
};
},
{ concurrency: 3 }
);
// Anthropic Worker
export const anthropicWorker = createLLMWorker(
'anthropic-query',
'anthropic',
async (prompt) => {
const response = await anthropic.messages.create({
model: 'claude-sonnet-4-5-20250929',
max_tokens: 4096,
messages: [{ role: 'user', content: prompt }],
});
const textBlock = response.content.find((block) => block.type === 'text');
return {
content: textBlock?.text || '',
tokens: { input: response.usage.input_tokens, output: response.usage.output_tokens },
};
},
{ concurrency: 3 }
);
// Gemini Worker
export const geminiWorker = createLLMWorker(
'gemini-query',
'gemini',
async (prompt) => {
const response = await gemini.models.generateContent({
model: 'gemini-2.0-flash',
contents: prompt,
config: { tools: [{ googleSearch: {} }], temperature: 0.3 },
});
return {
content: response.text || '',
tokens: {
input: response.usageMetadata?.promptTokenCount || 0,
output: response.usageMetadata?.candidatesTokenCount || 0,
},
};
},
{ concurrency: 2 } // Stricter rate limits
);
// Perplexity Worker
export const perplexityWorker = createLLMWorker(
'perplexity-query',
'perplexity',
async (prompt) => {
const response = await perplexity.chat.completions.create({
model: 'sonar-pro',
messages: [{ role: 'user', content: prompt }],
temperature: 0.2,
});
return {
content: response.choices[0]?.message?.content || '',
tokens: { input: response.usage?.prompt_tokens || 0, output: response.usage?.completion_tokens || 0 },
};
},
{ concurrency: 2 }
);
// DeepSeek Worker
export const deepseekWorker = createLLMWorker(
'deepseek-query',
'deepseek',
async (prompt) => {
const response = await deepseek.chat.completions.create({
model: 'deepseek-chat',
messages: [{ role: 'user', content: prompt }],
temperature: 0.3,
});
return {
content: response.choices[0]?.message?.content || '',
tokens: { input: response.usage?.prompt_tokens || 0, output: response.usage?.completion_tokens || 0 },
};
},
{ concurrency: 5 } // More generous limits
);
This pattern:
- Eliminates duplication — one worker template, five instances
- Isolates SDK differences — each query function handles its provider's quirks
- Simplifies testing — mock the query function, not the entire worker
- Enables easy additions — adding a sixth provider is one function call
The Orchestrator: Handling Partial Failures
The orchestrator is where the magic happens. It spawns all child jobs, waits for completion, and gracefully handles failures using BullMQ's WaitingChildrenError pattern and ignoreDependencyOnFailure:
// workers/orchestrator-worker.ts
import { Worker, Job, WaitingChildrenError } from 'bullmq';
import { openaiQueue, anthropicQueue /* ... other queues */ } from '../queues/llm-queues';
import type { LLMQueryJobData, LLMQueryResult } from '../queues/llm-queues';
interface OrchestratorJobData {
analysisId: string;
productName: string;
productDescription: string;
competitors: string[];
step?: 'spawn-children' | 'collect-results';
}
interface OrchestratorResult {
success: boolean;
analysisId: string;
results: LLMQueryResult[];
failedProviders: string[];
successCount: number;
}
export const orchestratorWorker = new Worker<OrchestratorJobData, OrchestratorResult>(
'llm-orchestrator',
async (job: Job<OrchestratorJobData>, token?: string): Promise<OrchestratorResult> => {
const { analysisId, productName, productDescription, competitors } = job.data;
const step = job.data.step || 'spawn-children';
// Build the analysis prompt
const prompt = `
Analyze the competitive position of ${productName}.
Product Description: ${productDescription}
Key Competitors: ${competitors.join(', ')}
Provide a detailed competitive analysis covering:
1. Market positioning
2. Unique value propositions
3. Competitive advantages and disadvantages
4. Market opportunities and threats
Format your response as structured analysis with clear sections.
`.trim();
const jobData: LLMQueryJobData = {
analysisId,
productName,
productDescription,
competitors,
prompt,
};
// ============================================================
// STEP 1: Spawn all LLM children in parallel
// ============================================================
if (step === 'spawn-children') {
// Check which providers already have results (from previous retry)
const existingChildren = await job.getChildrenValues();
const existingProviders = Object.values(existingChildren)
.filter((r): r is LLMQueryResult => r && typeof r === 'object' && 'provider' in r)
.map((r) => r.provider);
const queues = [
{ queue: openaiQueue, provider: 'openai' },
// ... other provider queues
];
// Only spawn jobs for providers that don't have results yet
const spawnPromises = queues
.filter(({ provider }) => !existingProviders.includes(provider))
.map(({ queue, provider }) =>
queue.add(`query-${provider}`, jobData, {
parent: { id: job.id!, queue: job.queueQualifiedName },
// CRITICAL: Allow parent to continue even if this child fails
ignoreDependencyOnFailure: true,
})
);
if (spawnPromises.length > 0) {
await Promise.all(spawnPromises);
console.log(`Spawned ${spawnPromises.length} LLM jobs (${existingProviders.length} already exist)`);
}
// Update step before waiting
await job.updateData({ ...job.data, step: 'collect-results' });
// Pause until all children complete (success or failure)
const shouldWait = await job.moveToWaitingChildren(token ?? '');
if (shouldWait) {
throw new WaitingChildrenError();
}
}
// ============================================================
// STEP 2: Collect results and handle partial failures
// ============================================================
if (step === 'collect-results') {
// Get all child results (both successful and failed)
const childrenValues = await job.getChildrenValues();
const ignoredFailures = await job.getIgnoredChildrenFailures();
// Extract successful results
const results = Object.values(childrenValues).filter(
(r): r is LLMQueryResult => r && typeof r === 'object' && 'provider' in r && r.success
);
// Track which providers failed
const failedProviders = Object.keys(ignoredFailures).map((key) => {
// Extract provider name from job key (e.g., "bull:openai-query:123" -> "openai")
const match = key.match(/bull:(\w+)-query:/);
return match ? match[1] : 'unknown';
});
console.log(`Collected ${results.length} successful results, ${failedProviders.length} failures`);
// Decide if we have enough results to proceed
const MIN_REQUIRED_PROVIDERS = 3;
if (results.length < MIN_REQUIRED_PROVIDERS) {
// Not enough results - throw to trigger orchestrator retry
// This will re-spawn only the failed providers (existing results preserved)
await job.updateData({ ...job.data, step: 'spawn-children' });
throw new Error(
`Only ${results.length}/${MIN_REQUIRED_PROVIDERS} required providers succeeded. ` +
`Failed: ${failedProviders.join(', ')}`
);
}
// We have enough results - return aggregated data
return {
success: true,
analysisId,
results,
failedProviders,
successCount: results.length,
};
}
throw new Error(`Unknown step: ${step}`);
},
{
connection,
concurrency: 5, // Process multiple orchestrations in parallel
}
);
The Key Pattern: ignoreDependencyOnFailure
The secret to graceful partial failures is ignoreDependencyOnFailure: true when spawning children:
await queue.add('query', jobData, {
parent: { id: job.id!, queue: job.queueQualifiedName },
ignoreDependencyOnFailure: true, // Parent resumes even if this child fails
});
When a child job exhausts all its retry attempts and finally fails:
- BullMQ moves it to the "ignored" category (not "failed")
- The parent job resumes (instead of waiting forever)
- You can inspect failures via
job.getIgnoredChildrenFailures() - Successful sibling results remain available via
job.getChildrenValues()
This is fundamentally different from Promise.all() behavior where one failure rejects everything.
Handling Retries: Preserving Successful Results
When the orchestrator retries (because not enough providers succeeded), you don't want to re-query providers that already returned results. The pattern is:
// Check which providers already have results
const existingChildren = await job.getChildrenValues();
const existingProviders = Object.values(existingChildren)
.filter((r): r is LLMQueryResult => r && 'provider' in r)
.map((r) => r.provider);
// Only spawn jobs for providers WITHOUT existing results
const spawnPromises = queues
.filter(({ provider }) => !existingProviders.includes(provider))
.map(({ queue }) => queue.add(...));
This means:
- First run: Spawns 5 jobs (all providers)
- Retry after 2 failures: Spawns only 2 jobs (failed providers)
- Successful results persist across retries
Utility: Checking Child Failures
Here's a reusable utility for checking child job failures:
// utils/check-child-failures.ts
import { Job } from 'bullmq';
export interface FailedChildInfo {
jobId: string;
failedReason: string;
}
export async function checkChildJobFailures(parentJob: Job): Promise<FailedChildInfo[]> {
const ignoredFailures = await parentJob.getIgnoredChildrenFailures();
if (!ignoredFailures || Object.keys(ignoredFailures).length === 0) {
return [];
}
// Track which failures we've already handled (stored in job data)
const handledFailures: string[] = parentJob.data._handledFailures || [];
// Filter out already-handled failures
const newFailures = Object.entries(ignoredFailures).filter(([jobId]) => !handledFailures.includes(jobId));
if (newFailures.length === 0) {
return [];
}
// Convert to array and update handled list
const failedChildren = newFailures.map(([jobId, reason]) => ({
jobId,
failedReason: reason || 'Unknown error',
}));
await parentJob.updateData({
...parentJob.data,
_handledFailures: [...handledFailures, ...failedChildren.map((c) => c.jobId)],
});
return failedChildren;
}
Starting an Analysis
To kick off a multi-LLM analysis:
// api/analyze.ts
import { orchestratorQueue } from '../queues/llm-queues';
export async function startAnalysis(params: {
productName: string;
productDescription: string;
competitors: string[];
}): Promise<{ analysisId: string; jobId: string }> {
const analysisId = crypto.randomUUID();
const job = await orchestratorQueue.add(
'analyze',
{
analysisId,
...params,
},
{
priority: 2,
// Deduplicate by product name to prevent duplicate analyses
deduplication: { id: params.productName },
}
);
return { analysisId, jobId: job.id! };
}
Monitoring with Bull Board
Add Bull Board for real-time monitoring:
// server.ts
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
import express from 'express';
import { orchestratorQueue, openaiQueue, anthropicQueue /* ... */ } from './queues/llm-queues';
const serverAdapter = new ExpressAdapter();
createBullBoard({
queues: [
new BullMQAdapter(orchestratorQueue),
// ... adapters for each provider queue
],
serverAdapter,
});
const app = express();
app.use('/admin/queues', serverAdapter.getRouter());
app.listen(3000);
Cost Optimization Tips
Running 5 LLMs per analysis adds up. Consider:
- Tiered providers: Use cheaper models (DeepSeek, Gemini Flash) for initial analysis, expensive models (Claude, GPT-4o) only when needed
- Caching: Cache responses for identical prompts using Redis
- Conditional spawning: Only query additional providers if initial results are insufficient
- Token budgets: Set max_tokens appropriately for each provider
// Example: Tiered approach
const tierOneProviders = ['deepseek', 'gemini']; // Cheaper
const tierTwoProviders = ['openai', 'anthropic', 'perplexity']; // More expensive
// Start with tier one, add tier two only if needed
Key Takeaways
- BullMQ parent-child pattern enables parallel LLM orchestration with independent retries
ignoreDependencyOnFailure: trueallows graceful handling of partial failures- Preserve successful results across retries by checking existing children before spawning
WaitingChildrenErrorpauses the parent until all children complete- Minimum threshold strategy ensures you have enough data before proceeding
- Each provider has different rate limits — adjust concurrency accordingly
Multi-LLM orchestration isn't about redundancy — it's about leveraging each provider's strengths while building resilient systems that handle the inevitable failures of distributed API calls.