Important: This documentation covers Yarn 1 (Classic).
For Yarn 2+ docs and migration guide, see yarnpkg.com.

Package detail

bullmq-extra

orrgal185MIT0.30.0TypeScript support: included

Additional features and extensions for BullMQ.

bullmq, queue, pubsub, message-queue, distributed-processing, nodejs, fanout, job-queue, parallel-processing, kafka, rabbitmq, redis, redis-streams, stream-joins, stream-merges, stream-aggregations, streams, request-response

readme

BullMQ Extra

BullMQ Extra is a set of additional features and extensions for the much beloved BullMQ. The library currently provides specialized patterns like Routing and Joining, with more useful features to come which are currently not available in the core BullMQ library.

Installation:

npm install bullmq-extra

Features

Router:

Routers allow you to distribute jobs from one or more source queues to one or more target queues. This is useful for implementing:

  • fan-out (1->N) patterns, where a single job is processed by multiple workers in parallel.
  • fan-in (N->1) patterns, where multiple job queues are combined and processed by a single worker.
  • fan-in to fan-out (N->N) patterns, where multiple job queues are combined and processed by multiple workers.

Under the hood the Router component leverages Redis Streams so you basically get the same publish-subscribe capability as in Kafka, including retention, consumer groups and message replay, but at a fraction of the complexity. And the additional useful patterns mentioned above. Also, as everything ends up in a BullMQ queue, you can use all the features of BullMQ like retries, priorities, etc. The cost/performance ratio is yet to be benchmarked.

Basic Usage:

import { Queue, Worker } from 'bullmq';
import { Router } from 'bullmq-extra';

// Create source queues somewhere in your application
const sourceQueue1 = new Queue('source1');
const sourceQueue2 = new Queue('source2');

// Create a router to process jobs from the source queue and distribute them to target queues
const targetQueue1 = new Queue('target1');
const targetQueue2 = new Queue('target2');
const router1 = new Router()
  .addSources('source1', 'source2')
  .addTargets(targetQueue1, targetQueue2);

router1.run().then().catch();

// Create multiple routers on the same source queues to different targets
const targetQueue3 = new Queue('target3');
const targetQueue4 = new Queue('target4');
const router2 = new Router()
  .addSources('source1')
  .addTargets(targetQueue3, targetQueue4);
router2.run().then().catch();

// Add jobs to the source queues
sourceQueue1.add('job', { data: 'data1' });
sourceQueue2.add('job', { data: 'data2' });

// The jobs will be delivered to all the target queues

Advanced Options (RouterOptions):

  • batchSize: The number of jobs to process in a single batch. Default is 1.

  • blockTimeMs: The time to wait before polling after an empty batch. Default is 1 second.

  • maxRetentionMs: The maximum time to retain a job in the router stream. Default is 24 hours.

  • trimIntervalMs: The interval in which to trim the router stream. Default is 1 minute.

  • optsOverride: A function that takes the job data and returns an object with options to override the default options for the job.

Caution:

  • Beware of circular dependencies when using routers. This can lead to infinite loops which will overload your Redis.

Join:

Joins allows you to combine jobs from multiple queues into a single queue while joining them on a common key. This is a common pattern in ETL and data processing pipelines and is now available for BullMQ users as well.

Basic Usage:

import { Queue } from 'bullmq';
import { Join } from 'bullmq-extra';

const join = new Join({
  joinName: 'join1',
  onComplete: (data) => {
    const sum = data.reduce((acc, val) => {
      return acc + val.value;
    }, 0);
    return { sum };
  },
  redis: new IORedis(),
  sources: [
    {
      queue: 'source1',
      getJoinKey: (data) => data.joinKey,
    },
    {
      queue: 'source2',
      getJoinKey: (data) => data.joinKey,
    },
  ],
  target: new Queue('target1'),
  timeout: 1000,
});
join.run();

// Add jobs to the source queues
sourceQueue1.add('job', { joinKey: 'key1', value: 1 });
sourceQueue2.add('job', { joinKey: 'key1', value: 2 });

// The result of the onComplete function will be added to the target queue

IMPORTANT NOTE: Join keys must be non-recurring across the lifetime of the application. Key recurrence may cause unexpected side effects.

If join-key for given data comes falsy, the data will not be stored and join will not be preformed on this job.

Accumulation:

Accumulations allow you to accumulate messages from a queue and output aggregations to a new queue.

Basic Usage:

import { Queue } from 'bullmq';
import { Accumulation } from 'bullmq-extra';

const accumulation = new Accumulation({
  accumulationName: 'accumulation1',
  onComplete: (data) => {
    const sum = data.reduce((acc, val) => {
      return acc + val.value;
    }, 0);
    return { sum };
  },
  opts: { connection },
  source: {
    queue: 'source1',
    getGroupKey: (data) => data.groupKey,
    prefix: `{prefix1}`,
  },
  target: new Queue('target1'),
  timeout: 1000,
  isComplete: async (data) => {
    return data.length === 10;
  },
});
accumulation.run();

// Add jobs to the source queue
sourceQueue1.add('job', { groupKey: 'key1', value: 1 });
sourceQueue1.add('job', { groupKey: 'key1', value: 2 });

// The result of the onComplete function will be added to the target queue

IMPORTANT NOTE: Accumulation keys must be non-recurring across the lifetime of the application. Key recurrence may cause unexpected side effects.

If group-key for given data comes falsy, the data will not be stored and accumulation will not be preformed on this job.

Request-Response:

Requesters and Responders allow you to create a request-response pattern with BullMQ. The Requester sends a job to a queue and waits for a response on a dedicated response queue. The Responder listens for requests on its own queue and sends each response to the appropriate response queue.

Basic Usage:

import { Requester, Responder } from 'bullmq-extra';

// Somewhere in your application create a responder
const responder = new Responder({
  responderName: 'additionJob',
  opts: { connection },
});
responder.processRequests((data: { a: number, b: number }) => {
  return { result: data.a + data.b };
});

// Create a requester or several in other places in your application
const requester1 = new Requester({
  requesterName: 'additionJob1',
  responderName: 'additionJob',
  opts: { connection },
});
requester1.request({ a: 1, b: 2 });
requester1.processResponses((data) => {
  console.log(data); // {result: 3}
});

const requester2 = new Requester({
  requesterName: 'additionJob2',
  responderName: 'additionJob',
  opts: { connection },
});
requester2.request({ a: 1, b: 5 });
requester2.processResponses((data) => {
  console.log(data); // {result: 6}
});

Broker:

Brokers are designed to run as sidecars to processes in non-nodejs languages and provide a thin API for producing and consuming messages from BullMQ. The broker will expose a REST API for producing and consuming messages and will handle all the BullMQ specifics like retries, priorities, etc.

Basic Usage:

The broker is packaged into a docker container and should be run as a sidecar to your service.

docker pull ghcr.io/orrgal1/bullmq-broker:latest

It accepts the following environment variables:

  • REDIS_HOST: The host of the Redis server.
  • REDIS_PORT: The port of the Redis server.
  • BROKER_PORT: The port on which the broker will listen for requests.
// When the broker processes messages it will send the data to a POST callback endpoint which your service must provide
// The following example is in node but the idea is to have this in another language

// Create a Queue called test
await axios.post('http://localhost:3003/queue', {
  name: 'test',
  opts: {},
});

// Create a Worker for the queue
await axios.post('http://localhost:3003/worker', {
  name: 'test',
  callback: 'http://localhost:3002/job', // The callback endpoint in your own service
  opts: {},
});

// Add a job to the queue. The worker will pick up this job and send the data to the callback endpoint.
// The callback endpoint must return a success status code for the job to be marked as completed.
// If the callback endpoint returns an error status code the job will be retried or discarded as per the Queue options.
await axios.post('http://localhost:3003/job', {
  name: 'test',
  data: { a: 3, b: 4 },
  opts: {},
});

Graceful Shutdown:

In order to preform jobs draining and to minimize stalled jobs, just call the close method of the class you are working with. For example:

await accumulation.close(); // accumulation is a variable from Accumulation type

Thin Clients:

Thin clients are built to interact with the broker and provide a thin convenient API for utilizing the full power of BullMQ + BullMQ Extra in languages other than NodeJS.

Currently, the following thin clients are available:

Java

Roadmap:

  • Add more thin clients for various languages like Python and Go to bring the power of BullMQ to those languages and allow integrating BullMQ into legacy codebases.
  • Support for all bullmq-extra patterns like Router, Join, Accumulation, Request-Response will be added.
  • Support the Kafka protocol for compatibility with existing Kafka clients.

Caution:

  • The package is new so breaking changes are to be expected until version 1.0.0.

Roadmap:

  • BullMQ Connect: Similiar to Kafka Connect, a way to connect BullMQ to other systems. Will probably be a separate package or several.

Contributing:

  • Feel free to open issues for questions, suggestions and feedback. And Issues...
  • To contribute code just fork and open pull requests.

Thanks! 🚀

changelog

Changelog

All notable changes to this project will be documented in this file. See conventional commits for commit guidelines.


0.30.0 - 2025-02-24

Bug Fixes

  • rm queue template types - (270287f) - noamico

    Features

  • add log, fix types - (02606a8) - noamico
  • update packages - (d9dec5d) - noamico

    Miscellaneous Chores

  • npm version to 0.29.0 - (f634206) - GitHub Action

0.29.0 - 2025-02-10

Features

  • limit completed and failed in target queue, align accumulator - (f715ee0) - noamico

    Miscellaneous Chores

  • npm version to 0.28.0 - (3b80cfd) - GitHub Action

0.28.0 - 2025-02-05

Features

  • change join key and complete methods to async - (cdb80b6) - noamico
  • change join key and complete methods to async - (bc2a82c) - noamico

    Miscellaneous Chores

  • npm version to 0.27.0 - (64f2355) - GitHub Action

0.27.0 - 2025-01-28

Features

  • add log msg and readme - (64a37a4) - noamico
  • add handling for null keys cases - (815c619) - noamico
  • add handling for null keys cases - (c48459a) - noamico
  • add handling for null keys cases - (13c1eb6) - noamico

    Miscellaneous Chores

  • npm version to 0.26.0 - (6f4cc3d) - GitHub Action

0.26.0 - 2025-01-26

Features

  • add close and prefix for joiner - (28d869f) - noamico

    Miscellaneous Chores

  • npm version to 0.25.0 - (d3caeaa) - GitHub Action

0.25.0 - 2025-01-21

Features

  • graceful-shutdown for accumulator - (f446217) - noamico

    Miscellaneous Chores

  • npm version to 0.24.0 - (4b01599) - GitHub Action

0.24.0 - 2025-01-20

Features

  • use prefixes - (31f58aa) - noamico

    Miscellaneous Chores

  • npm version to 0.23.1 - (a3c48b7) - GitHub Action

0.23.1 - 2025-01-20

Bug Fixes

  • _ instead of : in keys - (20e8bcc) - noamico

    Miscellaneous Chores

  • npm version to 0.23.0 - (3e8cf9e) - GitHub Action

0.23.0 - 2025-01-20

Bug Fixes

  • const redis port - (17674f4) - noamico
  • fix queue and worker initialization - (b92f93b) - noamico

    Features

  • change test to work on custom port to test connectivity - (6e04428) - noamico

    Miscellaneous Chores

  • npm version to 0.22.8 - (0f9f835) - GitHub Action

0.22.8 - 2025-01-16

Bug Fixes

  • fix accumulator readme - (e107914) - noamico

    Miscellaneous Chores

  • npm version to 0.22.7 - (5cef732) - GitHub Action

0.22.7 - 2025-01-14

Bug Fixes

  • README - (80e2425) - org

    Miscellaneous Chores

  • npm version to 0.22.6 - (4e063c9) - GitHub Action

0.22.6 - 2025-01-14

Bug Fixes

  • README - (4a546de) - org

0.22.4 - 2024-11-16

Bug Fixes

  • README - (34cc8de) - org

    Miscellaneous Chores

  • npm version to 0.22.3 - (a0b6521) - GitHub Action

0.22.3 - 2024-11-16

Bug Fixes

  • send only data to callback - (84a40e9) - org
  • send only data to callback - (abbdac2) - org

    Miscellaneous Chores

  • npm version to 0.22.2 - (f2725a4) - GitHub Action

0.22.2 - 2024-11-16

Bug Fixes

  • sanitize job opts - (db03c8c) - org

    Miscellaneous Chores

  • npm version to 0.22.1 - (f01a09e) - GitHub Action

0.22.1 - 2024-11-16

Bug Fixes

  • passing connection from broker to router - (caea21d) - org
  • passing connection from broker to router - (5d1cdbf) - org

    Miscellaneous Chores

  • npm version to 0.22.0 - (cb8a9c8) - GitHub Action

0.22.0 - 2024-11-16

Features

  • passing connection from broker to router - (84d63a9) - org

    Miscellaneous Chores

  • npm version to 0.21.0 - (5fafe50) - GitHub Action

0.21.0 - 2024-11-03

Features

  • broker docker - (f92628c) - org

    Miscellaneous Chores

  • npm version to 0.20.0 - (f5717ed) - GitHub Action

0.20.0 - 2024-11-03

Features

  • broker docker - (d2eabe9) - org
  • broker docker - (1b57cd1) - org

    Miscellaneous Chores

  • npm version to 0.19.0 - (abbd11a) - GitHub Action

0.19.0 - 2024-11-03

Features

  • broker - (064f252) - org

    Miscellaneous Chores

  • npm version to 0.18.0 - (af48f4a) - GitHub Action

0.18.0 - 2024-11-03

Features

  • broker - (e88519a) - org

    Miscellaneous Chores

  • npm version to 0.17.0 - (d45206e) - GitHub Action

0.17.0 - 2024-11-03

Features

  • broker - (c7d0021) - org

    Miscellaneous Chores

  • npm version to 0.16.0 - (6c88790) - GitHub Action

0.16.0 - 2024-11-03

Features

  • broker - (a3a03c3) - org
  • broker - (cc41c49) - org
  • broker - (2515318) - org
  • broker - (14fcf03) - org
  • broker - (df3af59) - org

    Miscellaneous Chores

  • npm version to 0.15.1 - (a0d3a7d) - GitHub Action

0.15.1 - 2024-11-02

Bug Fixes

  • readme - (19678d3) - org

    Miscellaneous Chores

  • npm version to 0.15.0 - (257bd83) - GitHub Action

0.15.0 - 2024-11-01

Features

  • reqrep readme - (5d0f7de) - org
  • reqrep readme - (03358e0) - org
  • reqrep readme - (d0c2424) - org
  • reqrep readme - (f0cc6c7) - org
  • coded reqrep - (b3a6311) - org
  • coded reqrep - (ade5775) - org

    Miscellaneous Chores

  • npm version to 0.14.0 - (3824cdf) - GitHub Action

0.14.0 - 2024-10-30

Features

  • fixed join tests - (f1372ac) - org

    Miscellaneous Chores

  • npm version to 0.13.0 - (6ed0c8a) - GitHub Action

0.13.0 - 2024-10-30

Features

  • fixed join tests - (b830481) - org
  • accumulations - (4f26ec0) - org
  • accumulations - (f142201) - org
  • accumulation wip... - (d1756e9) - org

    Miscellaneous Chores

  • npm version to 0.12.0 - (5c58441) - GitHub Action

0.12.0 - 2024-10-30

Features

  • single connection everywhere - (3ac1f3b) - org

    Miscellaneous Chores

  • npm version to 0.11.4 - (10cd634) - GitHub Action

0.11.4 - 2024-10-30

Bug Fixes

  • join docs in readme - (96efeb4) - org

    Miscellaneous Chores

  • npm version to 0.11.3 - (d5e36d2) - GitHub Action

0.11.3 - 2024-10-30

Bug Fixes

  • join docs in readme - (3ccb1a0) - org

    Miscellaneous Chores

  • npm version to 0.11.2 - (f96657f) - GitHub Action

0.11.2 - 2024-10-30

Bug Fixes

  • join docs in readme - (49d50af) - org

    Miscellaneous Chores

  • npm version to 0.11.1 - (04e5532) - GitHub Action

0.11.1 - 2024-10-30

Bug Fixes

  • join docs in readme - (229cd6c) - org

    Miscellaneous Chores

  • npm version to 0.11.0 - (b99d971) - GitHub Action

0.11.0 - 2024-10-29

Bug Fixes

  • merged - (7717814) - org

    Features

  • implemented join class tests - (efc049d) - org
  • implemented join class - (c4499ce) - org
  • implemented join class - (1974cb2) - org

    Miscellaneous Chores

  • npm version to 0.10.1 - (9a4b2a7) - GitHub Action

0.10.1 - 2024-10-28

Bug Fixes

  • readme refinements - (ec4be27) - org
  • readme refinements - (0c4a706) - org

    Miscellaneous Chores

  • npm version to 0.10.0 - (dafb73f) - GitHub Action

0.10.0 - 2024-10-26

Features

  • renamed PubSub to Router and expanded to many->many - (29dd6a7) - org

    Miscellaneous Chores

  • npm version to 0.9.0 - (ed97016) - GitHub Action

0.9.0 - 2024-10-26

Features

  • renamed PubSub to Router and expanded to many->many - (5512c41) - org

    Miscellaneous Chores

  • npm version to 0.8.0 - (73910b9) - GitHub Action

0.8.0 - 2024-10-26

Features

  • readme refinement - (960c780) - org

    Miscellaneous Chores

  • npm version to 0.7.0 - (b4fa792) - GitHub Action

0.7.0 - 2024-10-26

Features

  • readme refinement - (5cfd59a) - org

    Miscellaneous Chores

  • npm version to 0.6.0 - (2ff1928) - GitHub Action

0.6.0 - 2024-10-26

Features

  • readme refinement - (4ab59a2) - org

    Miscellaneous Chores

  • npm version to 0.5.0 - (fe51500) - GitHub Action

0.5.0 - 2024-10-26

Features

  • readme refinement - (8b0fd0b) - org
  • readme refinement - (7f483f8) - org

    Miscellaneous Chores

  • npm version to 0.4.0 - (f22c45a) - GitHub Action

0.4.0 - 2024-10-26

Features

  • renamed to PubSub - (aa349df) - org

    Miscellaneous Chores

  • npm version to 0.3.0 - (1cbe645) - GitHub Action

0.3.0 - 2024-10-26

Features

  • renamed to PubSub - (943a382) - org

    Miscellaneous Chores

  • npm version to 0.2.2 - (ea9ca1a) - GitHub Action

0.2.2 - 2024-10-25

Bug Fixes

  • readme refinements - (3b404b8) - org

    Miscellaneous Chores

  • npm version to 0.2.1 - (cc9982b) - GitHub Action

0.2.1 - 2024-10-25

Bug Fixes

  • test refinements - (5efbb62) - org
  • test batchSize - (50e48f7) - org
  • test batchSize - (1bd9bc3) - org

    Miscellaneous Chores

  • npm version to 0.2.0 - (afe0af2) - GitHub Action

0.2.0 - 2024-10-25

Bug Fixes

  • test batchSize - (7dc0708) - org

    Features

  • more elegant interface - (57c2b91) - org

    Miscellaneous Chores

  • npm version to 0.1.5 - (adb0096) - GitHub Action

0.1.5 - 2024-10-25

Bug Fixes

  • change to MIT - (64729d4) - org

    Miscellaneous Chores

  • npm version to 0.1.4 - (817d5f2) - GitHub Action

0.1.4 - 2024-10-25

Bug Fixes

  • change to MIT - (02c43c5) - org

    Miscellaneous Chores

  • npm version to 0.1.3 - (547a053) - GitHub Action

0.1.3 - 2024-10-25

Bug Fixes

  • cicd versioning - (ec9b428) - org

    Miscellaneous Chores

  • npm version to 0.1.2 - (b5011fd) - GitHub Action

0.1.2 - 2024-10-25

Bug Fixes

  • cicd versioning - (59beea9) - org

0.1.1 - 2024-10-25

Bug Fixes

  • cicd - (35cd444) - org

0.1.0 - 2024-10-25

Features

  • redis test container - (483fc2f) - org
  • cicd - (406110e) - org

0.0.0 - 2024-10-25

Features

  • cicd - (fca21f0) - org
  • pubsub complete - (bb29410) - org
  • pubsub complete - (889449f) - org
  • pubsub complete - (8d2fc70) - org
  • pubsub complete - (fb8a3b4) - org
  • init - (a504379) - org
  • init - (abcde83) - org

Changelog generated by cocogitto.