BullMQ is the go-to for robust background job processing in Node.js, and Redis is its indispensable data store.
Let’s spin up a simple Express app and a background job. We’ll use a job that simulates sending an email.
// express-app.js
const express = require('express');
const { Queue, Worker } = require('bullmq');
const Redis = require('ioredis');
const app = express();
const port = 3000;
// Configure Redis connection
const connection = new Redis({
host: 'localhost', // Default Redis host
port: 6379, // Default Redis port
});
// Create a new Queue instance
const emailQueue = new Queue('emailQueue', { connection });
// Define a worker to process jobs from the 'emailQueue'
const worker = new Worker('emailQueue', async (job) => {
console.log(`Processing job ${job.id} with data:`, job.data);
// Simulate sending an email
await new Promise(resolve => setTimeout(resolve, 5000)); // Simulate network latency/work
console.log(`Finished processing job ${job.id}`);
return { status: 'Email sent successfully!' };
}, { connection });
// Endpoint to add a new email job to the queue
app.post('/send-email', async (req, res) => {
const { to, subject, body } = req.body;
if (!to || !subject || !body) {
return res.status(400).json({ error: 'Missing required fields: to, subject, body' });
}
const job = await emailQueue.add('sendEmail', { to, subject, body });
console.log(`Job ${job.id} added to the queue.`);
res.status(201).json({ message: 'Email job added to queue', jobId: job.id });
});
// Start the Express server
app.listen(port, () => {
console.log(`Express app listening on port ${port}`);
});
// Handle worker errors and completion
worker.on('completed', (job) => {
console.log(`Job ${job.id} completed with result:`, job.returnvalue);
});
worker.on('failed', (job, err) => {
console.error(`Job ${job.id} failed with error:`, err.message);
});
// Graceful shutdown for worker
process.on('SIGINT', async () => {
await worker.close();
await emailQueue.close();
console.log('Worker and Queue closed.');
process.exit(0);
});
To run this:
- Make sure you have Redis running locally (e.g.,
docker run -d -p 6379:6379 redis). - Install dependencies:
npm install express bullmq ioredis. - Save the code as
express-app.js. - Run the app:
node express-app.js. - Send a POST request to
http://localhost:3000/send-emailwith JSON body:{ "to": "test@example.com", "subject": "Hello from BullMQ!", "body": "This is a test email." }
You’ll see logs in your terminal indicating the job being added and then processed by the worker.
The core problem BullMQ solves is decoupling time-consuming or resource-intensive tasks from the main request-response cycle of your web application. When a user requests to send an email (or perform any other background operation), your Express app quickly adds a job to a queue. A separate Worker process, which can run on the same or a different server, picks up these jobs and executes them independently. This prevents your web server from blocking, ensuring a snappier user experience and better overall application performance.
BullMQ uses Redis as a persistent, in-memory data store. It leverages Redis lists (LPUSH/RPUSH for adding, BLPOP/BRPOP for waiting) and other data structures to manage the job queue, job states (waiting, active, completed, failed), and associated metadata. The Queue class is your interface for adding jobs and interacting with the queue itself, while the Worker class is responsible for fetching jobs and executing their logic.
The connection object is crucial – it’s the bridge to your Redis instance. Both the Queue and Worker need this connection to communicate with Redis. The new Queue('emailQueue', { connection }) line creates a queue named emailQueue and associates it with your Redis connection. Similarly, new Worker('emailQueue', ..., { connection }) tells this worker to listen for jobs specifically on the emailQueue.
The worker’s callback function is where the actual "work" happens. It receives a job object, which contains the data you passed when adding the job. Inside this callback, you perform the operation – in this case, simulating an email send with setTimeout. The return value of this function becomes the job.returnvalue when the job completes successfully.
The most surprising true thing about BullMQ is that its job retry mechanism is not just about re-executing failed jobs; it’s a sophisticated state machine that can be configured to retry with exponential backoff, ensuring that transient network issues or temporary service unavailability don’t lead to permanent job failures.
The emailQueue.add('sendEmail', { to, subject, body }) call is the heart of job creation. The first argument, 'sendEmail', is the job name (useful for routing or differentiating job types). The second argument, { to, subject, body }, is the payload of data that will be passed to the worker. This data is serialized and stored in Redis.
The worker.on('completed', ...) and worker.on('failed', ...) event listeners are essential for monitoring and handling job outcomes. You can use these to update databases, send notifications, or trigger subsequent actions based on whether a job succeeded or failed. The process.on('SIGINT', ...) block demonstrates graceful shutdown, ensuring that any in-progress jobs are handled and connections are closed properly when the application is terminated.
The next problem you’ll likely encounter is managing job priorities and ensuring that certain critical jobs are processed before others.