@banyanai/platform-job-queue
@banyanai/platform-job-queue
Section titled “@banyanai/platform-job-queue”Production-ready job queue abstraction over BullMQ for the Banyan Platform. Provides background job processing, bulk operations, scheduled tasks, and automatic telemetry integration.
Installation
Section titled “Installation”pnpm add @banyanai/platform-job-queueOverview
Section titled “Overview”The job queue package wraps BullMQ to provide:
- Background Job Processing: Asynchronous execution of long-running tasks
- Bulk Operations: Efficient handling of large-scale operations (e.g., 20k+ user enrollments)
- Scheduled Tasks: Recurring jobs with cron patterns or one-time scheduled jobs
- Retry Logic: Automatic retries with exponential/fixed backoff
- Progress Tracking: Real-time job progress monitoring
- Horizontal Scaling: Independent worker scaling
- Automatic Telemetry: Built-in logging, metrics, and tracing via OpenTelemetry
Infrastructure
Section titled “Infrastructure”Uses existing Redis infrastructure (no additional services required):
redis: image: redis:7-alpine ports: - "6379:6379"Main Exports
Section titled “Main Exports”Classes
Section titled “Classes”JobQueue<TData, TResult>- Queue management and job creationWorker<TData, TResult>- Job processing with concurrency controlJobScheduler- Scheduled and recurring jobsJobMetrics- Queue metrics and monitoring
Job<TData, TResult>- Job interface with progress trackingJobOptions- Job configuration optionsQueueOptions- Queue configuration optionsWorkerOptions- Worker configuration optionsScheduledJobOptions- Scheduling configuration optionsJobState- Job state enumerationQueueStats- Queue statistics interface
Utilities
Section titled “Utilities”setGlobalRedisConfig()- Configure Redis connection globallyparseRedisConnectionString()- Parse Redis connection strings
API Reference
Section titled “API Reference”JobQueue
Section titled “JobQueue”Main queue management class for adding jobs and managing queues.
Constructor
Section titled “Constructor”constructor(name: string, options: Partial<QueueOptions>)Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Unique queue name |
options | Partial<QueueOptions> | Yes | Queue configuration |
Options:
interface QueueOptions { // Required: Redis connection connection: RedisConnection;
// Optional: Default options for all jobs defaultJobOptions?: JobOptions;
// Optional: Redis key prefix (default: 'banyan:jobs') prefix?: string;
// Optional: Rate limiting limiter?: { max: number; // Max jobs duration: number; // Per duration (ms) bounceBack?: boolean; // Bounce back when limited };
// Optional: Enable telemetry (default: true) enableTelemetry?: boolean;}Redis Connection:
interface RedisConnection { host: string; // Redis host URL or connection string port?: number; // Redis port (default: 6379) password?: string; // Redis password db?: number; // Redis database number (default: 0) tls?: Record<string, unknown>; // TLS configuration maxRetriesPerRequest?: number; // Max retry attempts enableOfflineQueue?: boolean; // Enable offline queue}Methods
Section titled “Methods”Add a single job to the queue.
async add( name: string, data: TData, opts?: JobOptions): Promise<Job<TData, TResult>>Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Job name/type |
data | TData | Yes | Job payload data |
opts | JobOptions | No | Job-specific options |
Job Options:
interface JobOptions { priority?: number; // Priority (1 = highest) delay?: number; // Delay in milliseconds attempts?: number; // Max retry attempts backoff?: { type: 'fixed' | 'exponential'; delay: number; // Backoff delay (ms) }; timeout?: number; // Execution timeout (ms) removeOnComplete?: boolean | { age?: number; // Keep for N seconds count?: number; // Keep last N jobs }; removeOnFail?: boolean | { age?: number; count?: number; }; jobId?: string; // Custom job ID}Returns: Promise resolving to created Job instance
Example:
const job = await queue.add('send-email', { to: 'user@example.com', subject: 'Welcome', body: 'Welcome to our platform!'}, { attempts: 3, backoff: { type: 'exponential', delay: 2000 }});
console.log(`Job created: ${job.id}`);addBulk()
Section titled “addBulk()”Add multiple jobs to the queue efficiently.
async addBulk( jobs: BulkJobData<TData>[]): Promise<Job<TData, TResult>[]>Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
jobs | BulkJobData<TData>[] | Yes | Array of job configurations |
Bulk Job Data:
interface BulkJobData<TData> { name: string; // Job name/type data: TData; // Job payload opts?: JobOptions; // Job-specific options}Returns: Promise resolving to array of created Job instances
Example:
const jobs = await queue.addBulk([ { name: 'enroll-user', data: { userId: 'user-1', courseId: 'course-123' } }, { name: 'enroll-user', data: { userId: 'user-2', courseId: 'course-123' } }, // ... up to 20,000+ jobs]);
console.log(`Added ${jobs.length} jobs`);getJob()
Section titled “getJob()”Retrieve a job by ID.
async getJob(jobId: string): Promise<Job<TData, TResult> | null>Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
jobId | string | Yes | Job identifier |
Returns: Promise resolving to Job or null if not found
getJobCounts()
Section titled “getJobCounts()”Get count of jobs in each state.
async getJobCounts(): Promise<Record<string, number>>Returns: Promise resolving to job counts by state:
{ waiting: number; active: number; completed: number; failed: number; delayed: number; paused: number;}pause()
Section titled “pause()”Pause the queue (stop processing new jobs).
async pause(): Promise<void>resume()
Section titled “resume()”Resume a paused queue.
async resume(): Promise<void>close()
Section titled “close()”Close the queue connection.
async close(): Promise<void>Worker
Section titled “Worker”Job processing class with concurrency control and automatic telemetry.
Constructor
Section titled “Constructor”constructor( queueName: string, processor: JobProcessor<TData, TResult>, options: WorkerOptions)Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
queueName | string | Yes | Name of queue to process |
processor | JobProcessor<TData, TResult> | Yes | Job processing function |
options | WorkerOptions | Yes | Worker configuration |
Job Processor:
type JobProcessor<TData, TResult> = ( job: Job<TData, TResult>) => Promise<TResult>Worker Options:
interface WorkerOptions { // Required: Redis connection connection: RedisConnection;
// Optional: Concurrency (default: 1) concurrency?: number;
// Optional: Rate limiter limiter?: { max: number; duration: number; groupKey?: string; };
// Optional: Job lock duration (ms, default: 30000) lockDuration?: number;
// Optional: Lock renewal interval (ms, default: lockDuration / 2) lockRenewTime?: number;
// Optional: Skip delayed jobs (default: false) skipDelayedJobs?: boolean;
// Optional: Max stalled count (default: 1) maxStalledCount?: number;
// Optional: Stalled check interval (ms, default: 30000) stalledInterval?: number;
// Optional: Auto-run worker (default: true) autorun?: boolean;
// Optional: Skip retry delay (default: false) skipRetryDelay?: boolean;
// Optional: Enable telemetry (default: true) enableTelemetry?: boolean;
// Optional: Enable metrics (default: true) enableMetrics?: boolean;}Methods
Section titled “Methods”Register event handlers for worker events.
on(event: string, handler: (...args: unknown[]) => void): voidWorker Events:
-
completed- Job completed successfullyworker.on('completed', (job: Job, result: TResult) => {console.log(`Job ${job.id} completed:`, result);}); -
failed- Job failedworker.on('failed', (job: Job | undefined, error: Error) => {console.error(`Job ${job?.id} failed:`, error);}); -
progress- Job progress updatedworker.on('progress', (job: Job, progress: number | object) => {console.log(`Job ${job.id} progress:`, progress);}); -
active- Job started processingworker.on('active', (job: Job) => {console.log(`Job ${job.id} started`);}); -
stalled- Job stalled (stuck in processing)worker.on('stalled', (jobId: string) => {console.warn(`Job ${jobId} stalled`);}); -
error- Worker errorworker.on('error', (error: Error) => {console.error('Worker error:', error);}); -
drained- Queue is emptyworker.on('drained', () => {console.log('Queue is empty');}); -
closed- Worker closedworker.on('closed', () => {console.log('Worker closed');});
close()
Section titled “close()”Close the worker and stop processing jobs.
async close(): Promise<void>JobScheduler
Section titled “JobScheduler”Scheduled and recurring job management.
Constructor
Section titled “Constructor”constructor(queueName: string, options: Partial<QueueOptions>)Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
queueName | string | Yes | Name of queue for scheduled jobs |
options | Partial<QueueOptions> | Yes | Queue configuration |
Methods
Section titled “Methods”addRecurring()
Section titled “addRecurring()”Add a recurring job with cron pattern.
async addRecurring( name: string, data: unknown, options: ScheduledJobOptions): Promise<void>Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Job name/type |
data | unknown | Yes | Job payload data |
options | ScheduledJobOptions | Yes | Scheduling options |
Scheduled Job Options:
interface ScheduledJobOptions { // For recurring jobs pattern?: string; // Cron pattern (e.g., '0 2 * * *') timezone?: string; // Timezone (e.g., 'America/New_York')
// For one-time scheduled jobs date?: Date; // Execution date}Example:
// Run every day at 2 AM ESTawait scheduler.addRecurring( 'daily-report', { reportType: 'summary' }, { pattern: '0 2 * * *', timezone: 'America/New_York' });Common Cron Patterns:
'0 * * * *' // Every hour'0 0 * * *' // Every day at midnight'0 2 * * *' // Every day at 2 AM'0 0 * * 0' // Every Sunday at midnight'0 0 1 * *' // First day of month at midnight'*/15 * * * *' // Every 15 minutesaddScheduled()
Section titled “addScheduled()”Add a one-time scheduled job.
async addScheduled( name: string, data: unknown, options: ScheduledJobOptions): Promise<void>Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Job name/type |
data | unknown | Yes | Job payload data |
options | ScheduledJobOptions | Yes | Scheduling options with date |
Example:
// Run once at specific date/timeawait scheduler.addScheduled( 'reminder', { message: 'Meeting tomorrow', userId: 'user-123' }, { date: new Date('2025-10-30T10:00:00Z') });close()
Section titled “close()”Close the scheduler.
async close(): Promise<void>JobMetrics
Section titled “JobMetrics”Queue metrics and monitoring with telemetry integration.
Constructor
Section titled “Constructor”constructor(queueName: string, options: Partial<QueueOptions>)Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
queueName | string | Yes | Name of queue to monitor |
options | Partial<QueueOptions> | Yes | Queue configuration |
Methods
Section titled “Methods”getStats()
Section titled “getStats()”Get current queue statistics.
async getStats(): Promise<QueueStats>Returns: Promise resolving to queue statistics:
interface QueueStats { waiting: number; // Jobs waiting to be processed active: number; // Jobs currently processing completed: number; // Total completed jobs failed: number; // Total failed jobs delayed: number; // Jobs delayed for later avgProcessingTime?: number; // Average processing time (ms) throughput?: number; // Jobs per second}Example:
const stats = await metrics.getStats();console.log(`Queue: ${stats.active} active, ${stats.waiting} waiting`);Automatic Telemetry:
When enableTelemetry: true (default), metrics are automatically published to OpenTelemetry:
job_queue_waiting_jobs- Waiting jobs gaugejob_queue_active_jobs- Active jobs gaugejob_queue_completed_jobs_total- Completed jobs counterjob_queue_failed_jobs_total- Failed jobs counterjob_queue_delayed_jobs- Delayed jobs gaugejob_queue_avg_processing_time_ms- Average processing timejob_queue_throughput- Jobs per second throughput
close()
Section titled “close()”Close the metrics collector.
async close(): Promise<void>Job Interface
Section titled “Job Interface”The Job interface represents a job in the queue.
interface Job<TData = unknown, TResult = unknown> { // Job identification id: string; // Unique job identifier name: string; // Job name/type
// Job data data: TData; // Job payload data opts: JobOptions; // Job options
// Job state progress: number | Record<string, unknown>; // Job progress returnvalue?: TResult; // Job result after completion failedReason?: string; // Failure reason if failed attemptsMade: number; // Number of attempts made
// Timestamps timestamp: number; // Created timestamp processedOn?: number; // Started processing timestamp finishedOn?: number; // Finished timestamp
// Methods updateProgress(progress: number | Record<string, unknown>): Promise<void>; remove(): Promise<void>; retry(): Promise<void>; getState(): Promise<JobState>; moveToFailed(error: Error | { message: string }, token: string): Promise<void>;}Job State:
type JobState = | 'waiting' // In queue, not started | 'active' // Currently processing | 'completed' // Successfully completed | 'failed' // Failed after retries | 'delayed' // Waiting for delay to expire | 'paused'; // Queue pausedUsage Examples
Section titled “Usage Examples”Basic Queue and Worker
Section titled “Basic Queue and Worker”import { JobQueue, Worker } from '@banyanai/platform-job-queue';
// Create queueconst queue = new JobQueue('email-notifications', { connection: { host: process.env.REDIS_URL || 'redis://localhost:6379' }, defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 2000 } }});
// Create workerconst worker = new Worker( 'email-notifications', async (job) => { const { to, subject, body } = job.data;
// Update progress await job.updateProgress(50);
// Send email (your implementation) await sendEmail(to, subject, body);
// Update progress await job.updateProgress(100);
return { sent: true, timestamp: new Date() }; }, { connection: { host: process.env.REDIS_URL || 'redis://localhost:6379' }, concurrency: 5 // Process 5 jobs concurrently });
// Add event handlersworker.on('completed', (job, result) => { console.log(`Job ${job.id} completed:`, result);});
worker.on('failed', (job, error) => { console.error(`Job ${job.id} failed:`, error);});
// Add jobsawait queue.add('send-email', { to: 'user@example.com', subject: 'Welcome', body: 'Welcome to our platform!'});Bulk Operations
Section titled “Bulk Operations”import { JobQueue } from '@banyanai/platform-job-queue';
const queue = new JobQueue('user-enrollment', { connection: { host: 'redis://localhost:6379' }});
// Enroll 20,000 users in a courseconst enrollments = users.map(user => ({ name: 'enroll-user', data: { userId: user.id, courseId: 'course-123', enrolledAt: new Date() }}));
// Add all jobs in one operationconst jobs = await queue.addBulk(enrollments);console.log(`Queued ${jobs.length} enrollments`);Scheduled and Recurring Jobs
Section titled “Scheduled and Recurring Jobs”import { JobScheduler, Worker } from '@banyanai/platform-job-queue';
// Create schedulerconst scheduler = new JobScheduler('scheduled-tasks', { connection: { host: 'redis://localhost:6379' }});
// Run daily report at 2 AM ESTawait scheduler.addRecurring( 'daily-report', { reportType: 'summary', recipients: ['admin@example.com'] }, { pattern: '0 2 * * *', timezone: 'America/New_York' });
// Run weekly report every Mondayawait scheduler.addRecurring( 'weekly-report', { reportType: 'detailed' }, { pattern: '0 0 * * 1', timezone: 'America/New_York' });
// One-time reminderawait scheduler.addScheduled( 'meeting-reminder', { message: 'Meeting at 3 PM', userId: 'user-123' }, { date: new Date('2025-10-30T14:45:00Z') });
// Create worker for scheduled tasksconst worker = new Worker( 'scheduled-tasks', async (job) => { if (job.name === 'daily-report') { await generateDailyReport(job.data); } else if (job.name === 'weekly-report') { await generateWeeklyReport(job.data); } else if (job.name === 'meeting-reminder') { await sendReminder(job.data); } }, { connection: { host: 'redis://localhost:6379' }, concurrency: 2 });Progress Tracking
Section titled “Progress Tracking”import { Worker } from '@banyanai/platform-job-queue';
const worker = new Worker( 'data-processing', async (job) => { const { items } = job.data; const total = items.length;
for (let i = 0; i < items.length; i++) { await processItem(items[i]);
// Update progress as percentage await job.updateProgress((i + 1) / total * 100);
// Or update with detailed progress object await job.updateProgress({ percentage: (i + 1) / total * 100, processed: i + 1, total: total, currentItem: items[i].name, estimatedTimeRemaining: calculateETA(i, total, startTime) }); }
return { processedCount: total }; }, { connection: { host: 'redis://localhost:6379' } });
// Monitor progressworker.on('progress', (job, progress) => { if (typeof progress === 'number') { console.log(`Job ${job.id}: ${progress}%`); } else { console.log(`Job ${job.id}:`, progress); }});Job Priority and Delay
Section titled “Job Priority and Delay”import { JobQueue } from '@banyanai/platform-job-queue';
const queue = new JobQueue('tasks', { connection: { host: 'redis://localhost:6379' }});
// High priority job (processed first)await queue.add('critical-task', { data: 'important' }, { priority: 1 // 1 = highest priority});
// Normal priority jobawait queue.add('normal-task', { data: 'regular' }, { priority: 10});
// Delayed job (execute in 1 hour)await queue.add('delayed-task', { data: 'later' }, { delay: 60 * 60 * 1000 // 1 hour in milliseconds});
// Delayed job (execute at specific time)const executeAt = new Date('2025-10-30T15:00:00Z');const delay = executeAt.getTime() - Date.now();
await queue.add('scheduled-task', { data: 'specific-time' }, { delay: delay});Retry Configuration
Section titled “Retry Configuration”import { JobQueue } from '@banyanai/platform-job-queue';
const queue = new JobQueue('api-calls', { connection: { host: 'redis://localhost:6379' }, defaultJobOptions: { // Retry up to 5 times attempts: 5,
// Exponential backoff: 2s, 4s, 8s, 16s, 32s backoff: { type: 'exponential', delay: 2000 } }});
// Job with custom retry strategyawait queue.add('flaky-api-call', { url: 'https://api.example.com' }, { attempts: 3, backoff: { type: 'fixed', delay: 5000 // Wait 5 seconds between retries }, timeout: 30000 // 30 second timeout per attempt});Job Cleanup
Section titled “Job Cleanup”import { JobQueue } from '@banyanai/platform-job-queue';
const queue = new JobQueue('cleanup-example', { connection: { host: 'redis://localhost:6379' }, defaultJobOptions: { // Remove completed jobs after 1 day or keep last 1000 removeOnComplete: { age: 86400, // 1 day in seconds count: 1000 // Keep last 1000 completed jobs },
// Keep failed jobs indefinitely for debugging removeOnFail: false }});
// Job-specific cleanupawait queue.add('temp-job', { data: 'temporary' }, { removeOnComplete: true, // Remove immediately when completed removeOnFail: true // Remove immediately when failed});Monitoring and Metrics
Section titled “Monitoring and Metrics”import { JobMetrics } from '@banyanai/platform-job-queue';
const metrics = new JobMetrics('email-notifications', { connection: { host: 'redis://localhost:6379' }});
// Get current statsconst stats = await metrics.getStats();console.log({ waiting: stats.waiting, active: stats.active, completed: stats.completed, failed: stats.failed, delayed: stats.delayed});
// Monitor continuouslysetInterval(async () => { const stats = await metrics.getStats();
if (stats.failed > 100) { console.warn('High failure rate detected!'); }
if (stats.waiting > 1000) { console.warn('Queue backlog detected!'); }}, 60000); // Check every minuteGlobal Configuration
Section titled “Global Configuration”import { setGlobalRedisConfig, JobQueue, Worker } from '@banyanai/platform-job-queue';
// Set global Redis configsetGlobalRedisConfig({ host: process.env.REDIS_URL || 'redis://localhost:6379', password: process.env.REDIS_PASSWORD, db: 0});
// Now all queues and workers can use simplified configconst queue = new JobQueue('notifications', { connection: {} // Uses global config});
const worker = new Worker( 'notifications', async (job) => { /* ... */ }, { connection: {} // Uses global config });Rate Limiting
Section titled “Rate Limiting”import { JobQueue, Worker } from '@banyanai/platform-job-queue';
// Limit queue to 100 jobs per 60 secondsconst queue = new JobQueue('api-calls', { connection: { host: 'redis://localhost:6379' }, limiter: { max: 100, // Maximum 100 jobs duration: 60000, // Per 60 seconds bounceBack: false // Don't retry when rate limited }});
// Limit worker to 10 jobs per secondconst worker = new Worker( 'api-calls', async (job) => { await callExternalAPI(job.data); }, { connection: { host: 'redis://localhost:6379' }, concurrency: 5, limiter: { max: 10, duration: 1000, groupKey: 'api-rate-limit' // Shared across workers } });Error Handling
Section titled “Error Handling”import { Worker } from '@banyanai/platform-job-queue';
const worker = new Worker( 'error-prone-tasks', async (job) => { try { // Your job processing logic const result = await processJob(job.data); return result; } catch (error) { // Log error (automatically logged if telemetry enabled) console.error(`Job ${job.id} error:`, error);
// Optionally mark job as failed with custom error if (error instanceof UnrecoverableError) { await job.moveToFailed( { message: `Unrecoverable: ${error.message}` }, 'worker-token' ); }
// Re-throw to trigger retry throw error; } }, { connection: { host: 'redis://localhost:6379' }, enableTelemetry: true // Automatic error logging });
// Handle worker-level errorsworker.on('error', (error) => { console.error('Worker error:', error); // Alert operations team});
// Handle failed jobsworker.on('failed', (job, error) => { if (job && job.attemptsMade >= 3) { console.error(`Job ${job.id} failed permanently:`, error); // Send to dead letter queue or alert }});Integration with Platform Services
Section titled “Integration with Platform Services”With BaseService
Section titled “With BaseService”import { BaseService } from '@banyanai/platform-base-service';import { JobQueue, Worker } from '@banyanai/platform-job-queue';
class MyService { private queue: JobQueue; private worker: Worker;
async start() { // Initialize queue this.queue = new JobQueue('my-service-jobs', { connection: { host: process.env.REDIS_URL || 'redis://localhost:6379' } });
// Initialize worker this.worker = new Worker( 'my-service-jobs', this.processJob.bind(this), { connection: { host: process.env.REDIS_URL || 'redis://localhost:6379' }, concurrency: 5 } );
// Start service await BaseService.start({ name: 'my-service', version: '1.0.0' }); }
private async processJob(job: Job) { // Job processing logic with full service context return { processed: true }; }
async stop() { await this.worker.close(); await this.queue.close(); }}With Telemetry
Section titled “With Telemetry”import { JobQueue, Worker } from '@banyanai/platform-job-queue';import { Logger, MetricsManager } from '@banyanai/platform-telemetry';
// Telemetry is automatic when enableTelemetry: true (default)const queue = new JobQueue('telemetry-example', { connection: { host: 'redis://localhost:6379' }, enableTelemetry: true // Default});
const worker = new Worker( 'telemetry-example', async (job) => { // All job execution automatically logged and metered // Custom logging still available Logger.info('Processing special job', { jobId: job.id });
return { processed: true }; }, { connection: { host: 'redis://localhost:6379' }, enableTelemetry: true, // Automatic logging enableMetrics: true // Automatic metrics });
// Metrics automatically recorded:// - job_queue_jobs_added_total// - job_queue_bulk_jobs_added_total// - job_queue_job_duration_ms// - job_queue_job_executions_total// - job_queue_waiting_jobs// - job_queue_active_jobs// - job_queue_completed_jobs_total// - job_queue_failed_jobs_totalBest Practices
Section titled “Best Practices”- ✅ Use descriptive job names for monitoring and debugging
- ✅ Configure appropriate retry strategies for each job type
- ✅ Set job timeouts to prevent stuck jobs
- ✅ Use bulk operations for large-scale job creation
- ✅ Monitor queue metrics and set up alerts
- ✅ Use job priorities for critical tasks
- ✅ Clean up completed jobs to prevent Redis bloat
- ✅ Use concurrency to scale processing
- ✅ Handle errors gracefully in job processors
- ✅ Update job progress for long-running tasks
DON’T:
Section titled “DON’T:”- ❌ Don’t create jobs for every tiny operation (use jobs for async/long-running tasks)
- ❌ Don’t disable telemetry in production (loses observability)
- ❌ Don’t set unlimited retries (can cause infinite loops)
- ❌ Don’t store large payloads in job data (use references instead)
- ❌ Don’t forget to close queues and workers on shutdown
- ❌ Don’t ignore failed jobs (set up monitoring and alerts)
- ❌ Don’t use blocking operations in job processors
- ❌ Don’t share Redis connections across different queue systems
Troubleshooting
Section titled “Troubleshooting”Jobs Not Processing
Section titled “Jobs Not Processing”// Check worker is runningworker.on('ready', () => { console.log('Worker is ready and listening');});
// Check queue statsconst stats = await queue.getJobCounts();console.log('Queue stats:', stats);
// Check if queue is pausedawait queue.resume();High Failure Rate
Section titled “High Failure Rate”// Increase retry attemptsawait queue.add('job', data, { attempts: 5, backoff: { type: 'exponential', delay: 5000 }});
// Add timeoutawait queue.add('job', data, { timeout: 60000 // 1 minute timeout});
// Monitor failuresworker.on('failed', (job, error) => { console.error(`Job ${job?.id} failed:`, { name: job?.name, attempts: job?.attemptsMade, error: error.message });});Queue Backlog
Section titled “Queue Backlog”// Increase worker concurrencyconst worker = new Worker('queue', processor, { connection: { host: 'redis://localhost:6379' }, concurrency: 20 // Process more jobs concurrently});
// Add more worker instancesconst worker2 = new Worker('queue', processor, options);const worker3 = new Worker('queue', processor, options);
// Monitor backlogconst stats = await metrics.getStats();if (stats.waiting > 1000) { console.warn('Queue backlog detected, scale up workers');}Memory Issues
Section titled “Memory Issues”// Configure job cleanupconst queue = new JobQueue('queue', { connection: { host: 'redis://localhost:6379' }, defaultJobOptions: { removeOnComplete: { age: 3600, // Keep for 1 hour count: 100 // Keep last 100 }, removeOnFail: { age: 86400, // Keep failures for 1 day count: 500 } }});Connection Issues
Section titled “Connection Issues”// Configure connection retriesconst queue = new JobQueue('queue', { connection: { host: 'redis://localhost:6379', maxRetriesPerRequest: 3, enableOfflineQueue: true // Queue commands while offline }});
// Handle connection errorsworker.on('error', (error) => { if (error.message.includes('ECONNREFUSED')) { console.error('Redis connection refused'); // Alert operations team }});Performance Considerations
Section titled “Performance Considerations”Throughput Optimization
Section titled “Throughput Optimization”// Use bulk operations for adding many jobsconst jobs = items.map(item => ({ name: 'process-item', data: item}));await queue.addBulk(jobs); // Much faster than individual adds
// Increase worker concurrencyconst worker = new Worker('queue', processor, { connection: { host: 'redis://localhost:6379' }, concurrency: 50 // Process 50 jobs concurrently});
// Horizontal scaling (multiple worker instances)// Deploy multiple worker processes/containersMemory Efficiency
Section titled “Memory Efficiency”// Store references, not large dataawait queue.add('process-file', { fileId: 'file-123', // Store ID bucket: 's3-bucket'});// Not: { fileContent: Buffer.alloc(10000000) }
// Configure cleanup aggressivelyremoveOnComplete: { count: 10 } // Keep only last 10Related Resources
Section titled “Related Resources”- Job Queue Guide - Comprehensive job queue usage guide
- Telemetry Package - Automatic logging and metrics
- BullMQ Documentation - Underlying BullMQ library
- Redis Best Practices - Redis optimization