Node.js background job processing with bull - separate producer and consumers

Photo by John Angel on Unsplash

Node.js background job processing with bull - separate producer and consumers

Mohsen Kamrani's photo
Mohsen Kamrani
·Dec 31, 2021·

6 min read

Subscribe to my newsletter and never miss my upcoming articles

There are many reasons to need a background job processing mechanism in your application which is often because the job takes longer than that we can handle it in the context of a http request-response communication.

For example, generating a report and publishing the results that takes a lot of time which makes it nearly impossible to use http request-response to do the job or running a batch processing job every time a certain file is uploaded to Dropbox again is an example of where using background job processing comes handy.

In a series of tutorials I'm showing you how to use a fantastic tool named bull to handle different scenarios of background processing. Each of the examples can be used as a standalone tutorial so in case anyone starts reading them doesn't need to refer to my other tutorials to understand what's going on. This also means that you might find few things repeated in these tutorials which obviously you can easily detect and just skip.

In this example we set up bull and Redis and we'll see how easily we can have separate processes that produce and consume the jobs. Although, this separation doesn't imply any architecture, I'll assume there is a server that acts as the producer and there are multiple workers.

Let's first create two folders server and worker to hold the code for the server and the worker services.

We start by preparing the server part. First things first, we start with creating our package.json:

npm init -y

In this tutorial I want to introduce the bull itself so we don't bother with creating a shiny server and we leave it for another tutorial. Let's create a file named server.js inside the server folder and fill it with basic set up:

const http = require('http');
const port = process.env.PORT || 9090;

const server = http.createServer();
server.listen(port);
console.log(`Listening on port: ${port}`)

Now that we have something to run, let's install nodemon to make it easier to see the results as soon as we modify any files:

npm install nodemon

To get nodemon up and running there is one more tiny step, which is to update our package.json file by adding a start script like this:

"start": "nodemon server.js",

This is how the package.json scripts looks like so far:

...
"scripts": {
    "start": "nodemon server.js"
  },
...

Now we're ready to add the part that our server schedules some jobs to be processed. We start by installing the bull package:

npm install bull --save

Bull uses Redis as its backbone to manage the jobs, so here we simply start a Redis container like this:

docker run -p 6379:6379 -d redis

Let's set some variables to later use in our code:

const redisHost = process.env.REDIS_HOST || '127.0.0.1';
const redisPort = process.env.REDIS_PORT || 6379;
const intervalInMilli = 1000; // 1000 milliseconds;
const queueName = 'routine_jobs';

Now we create a new queue:

const routineJobsQueue = new Queue(queueName, { redis: { port: redisPort, host: redisHost } });

So far it's been pretty self explanatory, just notice that we can also provide a password when creating the queue if our Redis requires it:

const routineJobsQueue = new Queue(queueName, { redis: { port: redisPort, host: redisHost, password: 'something_super_secure'  } });

Now let's simulate producing jobs by adding an arbitrary job to the queue at equal intervals:

let count = 0;
setInterval(async () => {
  const job = {
    jobId: count,
    value: count,
    jobType: 'routine'
  };
  await routineJobsQueue.add(job);
  console.log(`scheduled job: ${count}`);
  count++;
}, intervalInMilli);

Just worth mentioning all the properties of the job are arbitrary.

Finally we start listening on the specified port to better simulate the server process, in the next tutorial I'll explain how to add a UI for bull to our server which this part comes more handy indeed. Also, I can imagine you might be thinking about the possibility to have an endpoint to add jobs to the queue, I'll cover that too in another tutorial, so stay tunde.

const server = http.createServer();
server.listen(port);

Ultimately, this is how our server.js looks like.

const http = require('http');
const Queue = require('bull');

const port = process.env.PORT || 9090;
const redisHost = process.env.REDIS_HOST || '127.0.0.1';
const redisPort = process.env.REDIS_PORT || 6379;
const intervalInMilli = 1000; // 1000 milliseconds

// A queue for the jobs scheduled based on a routine without any external requests
const routineJobsQueue = new Queue('routine_jobs', { redis: { port: redisPort, host: redisHost } });

routineJobsQueue.on('completed', function (job, result) {
  const jobData = job.data;
  console.log(`job ${jobData.jobId} completed with result: ${JSON.stringify(result)}`)
})

// Generate a routine job every second
let count = 0;
setInterval(async () => {
  await routineJobsQueue.add({
    jobId: count,
    value: count,
    jobType: 'routine'
  });
  console.log(`scheduled job: ${count}`);
  count++;
}, intervalInMilli);


const server = http.createServer();
server.listen(port);
console.log(`Listening on port: ${port}`)

Now let's write the workers' code where we process these jobs. Notice that having the workers in separate processes not only help you with avoiding issues cause by CPU intensive jobs that can impact bull's behaviour, it helps you automatically increase or decrease the number of worker processes depending on the load.

In our worker/index.js file, again we create the package.json, install bull and update our package.json like this:

...
"scripts": {
    "start": "nodemon index.js"
  },
...

Now again we start with creating the queue. Notice that it won't cause any issues if the queue is already created by server or even another worker and this is actually how we get a handle to our queue to start fetching jobs from.

const Queue = require('bull');
const redisHost = process.env.REDIS_HOST || '127.0.0.1';
const redisPort = process.env.REDIS_PORT || 6379;

// The queue name is exactly the same as what we set in the server. Notice that you must connect to the same redis instance too.
const routineJobsQueue = new Queue('routine_jobs', { redis: { port: redisPort, host: redisHost } });

Now that we created the queue in our worker too, it's time to add the function to process the jobs:

routineJobsQueue.process(async function (job) {
  const jobData = job.data;
  console.log(`processing job ${jobData.jobId}`);
  // Let's set a dummy result
  return ({ t2: jobData.value * 2, t3: jobData.value * 3 });
});

The process method supports callbacks too, and to do so, you have to pass two parameters to the processor function: function (job, done) where done is a function with two parameters: error and result. So, this code could be also written like this:

routineJobsQueue.process(function (job, done) {
  const jobData = job.data;
  console.log(`processing job ${jobData.jobId}`);
  // Let's set a dummy result
done(null, { t2: jobData.value * 2, t3: jobData.value * 3 });
});

Finally, this is how our index.js looks like:

const Queue = require('bull');
const redisHost = process.env.REDIS_HOST || '127.0.0.1';
const redisPort = process.env.REDIS_PORT || 6379;

// A queue for the jobs scheduled based on a routine without any external requests
const routineJobsQueue = new Queue('routine_jobs', { redis: { port: redisPort, host: redisHost } });

routineJobsQueue.process(async function (job) {
  const jobData = job.data;
  console.log(`processing job ${jobData.jobId}`);
  // Let's set a dummy result
  return ({ t2: jobData.value * 2, t3: jobData.value * 3 });
});

Time to test the code and see the end result. Open a shell, go to the worker folder and run the program:

cd worker && npm start

Then, we go to the server folder and run the program:

cd server && npm start

Soon I'll update this post and will show you how you can simple host the application with utopiops so stay tuned as you might get some exciting coupon codes too ;) !

Finally, you can find the complete source code here on Github .

 
Share this