Important: This documentation covers Yarn 1 (Classic).
For Yarn 2+ docs and migration guide, see yarnpkg.com.

Package detail

rabbitmq-enterprise-toolkit

ahmetdlbs147MIT1.0.4TypeScript support: included

πŸš€ Enterprise-grade RabbitMQ wrapper for Node.js & TypeScript - High-throughput messaging with automatic retry, DLQ, batch processing & graceful shutdown. Production-ready AMQP client with zero message loss guarantee.

rabbitmq, amqp, message-queue, typescript, nodejs, microservices, high-throughput, enterprise, production-ready, retry-mechanism, dead-letter-queue, dlq, batch-processing, graceful-shutdown, connection-pooling, auto-reconnect, message-broker, pub-sub, publisher-subscriber, event-driven, async-messaging, reliable-messaging, zero-message-loss, idempotent, message-persistence, queue-management, scalable, performance, throughput, concurrent, type-safe, modern, wrapper, client, sdk, amqplib, messaging-patterns, producer-consumer, work-queues, task-queue, job-queue, background-jobs, distributed-systems, cloud-native, kubernetes, docker, monitoring, observability, fault-tolerant, resilient

readme

πŸš€ RabbitMQ Toolkit

Modern, type-safe RabbitMQ wrapper with built-in best practices for Node.js applications. Optimized for both small-scale and high-throughput enterprise applications.

Version TypeScript Production High-Throughput

✨ Features

πŸ›‘οΈ Reliability & Safety

  • βœ… Zero Message Loss: Persistent messages with durable queues
  • βœ… Auto-Reconnection: Built-in connection recovery with configurable delays
  • βœ… Dead Letter Queues: Failed messages are preserved and trackable
  • βœ… Manual ACK: Messages stay in queue until successfully processed
  • βœ… Graceful Shutdown: Clean application termination

⚑ Performance & Scalability

  • βœ… Lazy Queues: Optimized for high-throughput scenarios (50K+ msg/sec)
  • βœ… Batch Publishing: Automatic batching for improved performance
  • βœ… Configurable Prefetch: Memory and concurrency optimization
  • βœ… Retry Mechanism: Exponential backoff with customizable strategies
  • βœ… Connection Pooling: Efficient resource utilization

πŸ”§ Developer Experience

  • βœ… Full TypeScript: Complete type safety with comprehensive interfaces
  • βœ… Modular Architecture: Clean, extensible codebase
  • βœ… Comprehensive Error Handling: Detailed error types and contexts
  • βœ… Production Best Practices: Built-in enterprise patterns

πŸ“¦ Installation

npm install rabbitmq-enterprise-toolkit

πŸš€ Quick Start

import { createRabbitMQ } from 'rabbitmq-enterprise-toolkit';

const rabbitmq = createRabbitMQ({
  connection: {
    host: 'localhost',
    port: 5672,
    username: 'guest',
    password: 'guest'
  }
});

// Register consumer first (creates queue with proper settings)
await rabbitmq.registerQueue({
  queueName: 'orders',
  processor: async (data) => {
    console.log('Processing order:', data);
    await processOrder(data);
  },
  errCallback: async (error) => {
    console.error('Order processing failed:', error);
  }
});

// Publish messages
await rabbitmq.publish('orders', {
  orderId: '12345',
  customerId: 'user-456',
  items: [{ productId: 'p1', quantity: 2 }]
});

πŸ“‹ Complete Configuration Reference

πŸ”Œ Connection Configuration

interface ConnectionConfig {
  host: string;                    // RabbitMQ server host
  port: number;                    // RabbitMQ server port (default: 5672)
  username: string;                // Authentication username
  password: string;                // Authentication password
  vhost?: string;                  // Virtual host (default: '/')
  reconnectDelay?: number;         // Reconnection delay in ms (default: 5000)
  heartbeat?: number;              // Heartbeat interval in seconds (default: 60)
  ssl?: boolean;                   // Enable SSL/TLS (default: false)
}

βš™οΈ RabbitMQ Configuration

interface RabbitMQConfig {
  connection: ConnectionConfig;
  defaultPrefetchCount?: number;   // Global prefetch count (default: 10)
  gracefulShutdownTimeout?: number; // Shutdown timeout in ms (default: 10000)
  highThroughput?: boolean;        // Enable high-throughput optimizations
}

🎯 Queue Configuration

interface QueueOptions {
  queueName: string;               // Queue name (will be normalized)
  processor: (data: any) => Promise<void>; // Message processor function
  errCallback?: (error: Error) => Promise<void>; // Error handler
  prefetchCount?: number;          // Queue-specific prefetch (default: 10)
  retryOptions?: RetryOptions;     // Retry configuration
  durable?: boolean;               // Queue persistence (default: true)
  exclusive?: boolean;             // Exclusive queue (default: false)
  autoDelete?: boolean;            // Auto-delete when empty (default: false)
  deadLetterExchange?: string;     // Custom DLX name
  deadLetterRoutingKey?: string;   // Custom DLX routing key
}

πŸ”„ Retry Configuration

interface RetryOptions {
  maxRetries: number;              // Maximum retry attempts
  retryDelay: number;              // Initial delay in milliseconds
  delayType: 'fixed' | 'exponential'; // Retry delay strategy
  delay?: number;                  // Custom delay for exponential (default: retryDelay)
}

πŸ“€ Publishing Options

interface PublishOptions {
  persistent?: boolean;            // Message persistence (default: true)
  messageId?: string;              // Custom message ID (auto-generated if not provided)
  timestamp?: number;              // Custom timestamp (auto-generated if not provided)
  expiration?: string;             // Message TTL in milliseconds
  priority?: number;               // Message priority (0-255)
  batch?: boolean;                 // Enable batch publishing (default: true in high-throughput mode)
}

πŸ“Š Scenario-Based Recommendations

🐣 Small Scale Applications (< 1K messages/hour)

const rabbitmq = createRabbitMQ({
  connection: {
    host: 'localhost',
    port: 5672,
    username: 'guest',
    password: 'guest',
    reconnectDelay: 5000,
    heartbeat: 60
  },
  defaultPrefetchCount: 5,         // Low prefetch for resource efficiency
  gracefulShutdownTimeout: 10000
});

await rabbitmq.registerQueue({
  queueName: 'notifications',
  processor: async (data) => {
    await sendEmail(data);
  },
  prefetchCount: 3,                // Process 3 messages concurrently
  retryOptions: {
    maxRetries: 5,
    retryDelay: 2000,
    delayType: 'exponential'       // 2s, 4s, 8s, 16s, 32s
  }
});

🏒 Medium Scale Applications (1K-10K messages/hour)

const rabbitmq = createRabbitMQ({
  connection: {
    host: 'rabbitmq.company.com',
    port: 5672,
    username: process.env.RABBITMQ_USER,
    password: process.env.RABBITMQ_PASS,
    reconnectDelay: 3000,
    heartbeat: 30,
    ssl: true                      // Enable SSL for production
  },
  defaultPrefetchCount: 20,        // Balanced prefetch
  gracefulShutdownTimeout: 15000
});

await rabbitmq.registerQueue({
  queueName: 'order-processing',
  processor: async (data) => {
    await processOrder(data);
    await updateInventory(data);
    await sendConfirmation(data);
  },
  prefetchCount: 15,               // Process 15 orders concurrently
  retryOptions: {
    maxRetries: 3,
    retryDelay: 1500,
    delayType: 'exponential'
  }
});

πŸš€ High-Throughput Applications (10K+ messages/hour)

const rabbitmq = createRabbitMQ({
  connection: {
    host: 'rabbitmq-cluster.company.com',
    port: 5672,
    username: process.env.RABBITMQ_USER,
    password: process.env.RABBITMQ_PASS,
    reconnectDelay: 2000,
    heartbeat: 30,
    ssl: true
  },
  defaultPrefetchCount: 100,       // High prefetch for throughput
  gracefulShutdownTimeout: 30000,
  highThroughput: true             // Enable high-throughput optimizations
});

await rabbitmq.registerQueue({
  queueName: 'analytics-events',
  processor: async (data) => {
    await saveToDatabase(data);    // Fast database operations
  },
  prefetchCount: 200,              // Very high concurrent processing
  retryOptions: {
    maxRetries: 3,
    retryDelay: 1000,
    delayType: 'fixed'             // Fixed delay for faster recovery
  }
});

// Batch publishing for maximum throughput
for (let i = 0; i < 100000; i++) {
  await rabbitmq.publish('analytics-events', {
    userId: i,
    event: 'page_view',
    timestamp: Date.now()
  }, {
    batch: true,                   // Automatic batching
    persistent: true
  });
}

πŸ”„ Retry Strategies Explained

retryOptions: {
  maxRetries: 5,
  retryDelay: 1000,
  delayType: 'exponential'
}
// Retry delays: 1s β†’ 2s β†’ 4s β†’ 8s β†’ 16s

Best for: API timeouts, temporary database locks, network hiccups

retryOptions: {
  maxRetries: 3,
  retryDelay: 2000,
  delayType: 'fixed'
}
// Retry delays: 2s β†’ 2s β†’ 2s

Best for: High-volume processing, consistent external service delays

Custom Exponential

retryOptions: {
  maxRetries: 4,
  retryDelay: 500,    // Initial delay
  delayType: 'exponential',
  delay: 2000         // Custom base for exponential calculation
}
// Retry delays: 2s β†’ 4s β†’ 8s β†’ 16s

πŸ› οΈ Advanced Usage Examples

Error Handling & Monitoring

await rabbitmq.registerQueue({
  queueName: 'critical-operations',
  processor: async (data) => {
    const startTime = Date.now();
    try {
      await performCriticalOperation(data);
      console.log(`βœ… Operation completed in ${Date.now() - startTime}ms`);
    } catch (error) {
      console.error(`❌ Operation failed for ${data.id}:`, error);

      // Send alert for critical failures
      if (error.code === 'CRITICAL_ERROR') {
        await sendAlert(`Critical operation failed: ${data.id}`);
      }

      throw error; // Re-throw to trigger retry mechanism
    }
  },
  errCallback: async (error, data) => {
    // Log all errors for monitoring
    console.error('Error details:', {
      error: error.message,
      messageId: data?.id,
      timestamp: new Date().toISOString(),
      stack: error.stack
    });

    // Send to monitoring system
    await metrics.increment('message_processing_errors');
  },
  retryOptions: {
    maxRetries: 5,
    retryDelay: 2000,
    delayType: 'exponential'
  }
});

Dead Letter Queue Monitoring

// Monitor failed messages
await rabbitmq.registerQueue({
  queueName: 'orders.dlq',
  processor: async (data) => {
    console.log('🚨 CRITICAL: Order permanently failed:', data);

    // Alert operations team
    await sendSlackAlert(`❌ Order ${data.orderId} failed permanently after all retries`);

    // Store in failed orders table for manual review
    await database.failedOrders.create({
      orderId: data.orderId,
      originalData: data,
      failureTime: new Date(),
      reason: 'Max retries exceeded'
    });
  }
});

Multiple Queue Management

const queueConfigs = [
  {
    name: 'high-priority-orders',
    prefetch: 50,
    retries: 5
  },
  {
    name: 'normal-orders',
    prefetch: 20,
    retries: 3
  },
  {
    name: 'background-tasks',
    prefetch: 5,
    retries: 2
  }
];

for (const config of queueConfigs) {
  await rabbitmq.registerQueue({
    queueName: config.name,
    processor: async (data) => {
      await processBasedOnQueue(config.name, data);
    },
    prefetchCount: config.prefetch,
    retryOptions: {
      maxRetries: config.retries,
      retryDelay: 1000,
      delayType: 'exponential'
    }
  });
}

Graceful Shutdown Implementation

// Production-ready shutdown handling
let isShuttingDown = false;

process.on('SIGTERM', handleShutdown);
process.on('SIGINT', handleShutdown);

async function handleShutdown(signal: string) {
  if (isShuttingDown) return;
  isShuttingDown = true;

  console.log(`πŸ“₯ Received ${signal}. Starting graceful shutdown...`);

  // Stop accepting new work
  server.close();

  // Close RabbitMQ connections gracefully
  await rabbitmq.close();

  console.log('βœ… Graceful shutdown completed');
  process.exit(0);
}

πŸ”’ Production Deployment

Environment Configuration

// config/production.ts
const config = {
  rabbitmq: {
    connection: {
      host: process.env.RABBITMQ_HOST,
      port: parseInt(process.env.RABBITMQ_PORT || '5672'),
      username: process.env.RABBITMQ_USERNAME,
      password: process.env.RABBITMQ_PASSWORD,
      vhost: process.env.RABBITMQ_VHOST || '/',
      ssl: process.env.NODE_ENV === 'production',
      heartbeat: 30,
      reconnectDelay: 3000
    },
    defaultPrefetchCount: parseInt(process.env.RABBITMQ_PREFETCH || '20'),
    gracefulShutdownTimeout: 30000
  }
};

Docker Compose for Development

# docker-compose.yml
version: '3.8'
services:
  rabbitmq:
    image: rabbitmq:3-management
    hostname: rabbitmq-dev
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=password
      - RABBITMQ_VM_MEMORY_HIGH_WATERMARK=0.8
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
    restart: unless-stopped

volumes:
  rabbitmq_data:

Health Check Implementation

// Health check endpoint
app.get('/health', (req, res) => {
  const health = {
    status: rabbitmq.isConnected() ? 'healthy' : 'unhealthy',
    timestamp: new Date().toISOString(),
    uptime: process.uptime(),
    memory: process.memoryUsage(),
    connection: rabbitmq.getConnectionInfo()
  };

  const statusCode = health.status === 'healthy' ? 200 : 503;
  res.status(statusCode).json(health);
});

πŸ“ˆ Performance Tuning

Memory Optimization

// For high-memory usage scenarios
const rabbitmq = createRabbitMQ({
  connection: {
    host: 'localhost',
    port: 5672,
    username: 'guest',
    password: 'guest',
    heartbeat: 30
  },
  defaultPrefetchCount: 50,        // Adjust based on available memory
  highThroughput: true
});

// Queues automatically configured with:
// - Lazy mode (disk-based storage)
// - Max length limits (1M messages)
// - Overflow protection

CPU Optimization

// Distribute load across multiple consumers
const workers = require('os').cpus().length;

for (let i = 0; i < workers; i++) {
  await rabbitmq.registerQueue({
    queueName: `worker-queue-${i}`,
    processor: async (data) => {
      await processCPUIntensiveTask(data);
    },
    prefetchCount: 10
  });
}

Network Optimization

// Batch publishing for network efficiency
const messages = generateLargeDataSet();

for (const message of messages) {
  await rabbitmq.publish('bulk-queue', message, {
    batch: true,           // Automatic batching (100 messages or 50ms)
    persistent: true
  });
}

πŸ§ͺ Testing

Unit Testing Example

// tests/rabbitmq.test.ts
import { createRabbitMQ } from 'rabbitmq-enterprise-toolkit';

describe('RabbitMQ Toolkit', () => {
  let rabbitmq: RabbitMQInstance;

  beforeEach(() => {
    rabbitmq = createRabbitMQ({
      connection: {
        host: 'localhost',
        port: 5672,
        username: 'guest',
        password: 'guest'
      }
    });
  });

  afterEach(async () => {
    await rabbitmq.close();
  });

  it('should process messages successfully', async () => {
    const processedMessages: any[] = [];

    await rabbitmq.registerQueue({
      queueName: 'test-queue',
      processor: async (data) => {
        processedMessages.push(data);
      }
    });

    await rabbitmq.publish('test-queue', { id: 1, test: true });

    // Wait for processing
    await new Promise(resolve => setTimeout(resolve, 1000));

    expect(processedMessages).toHaveLength(1);
    expect(processedMessages[0]).toEqual({ id: 1, test: true });
  });
});

Integration Testing

# Start test environment
docker-compose -f docker-compose.test.yml up -d

# Run tests
npm test

# Run specific example
npm run example:basic
npm run example:high-throughput

🚨 Troubleshooting

Common Issues

Queue Precondition Failed

# Clean existing queues
docker exec -it rabbitmq-container rabbitmqctl list_queues
docker exec -it rabbitmq-container rabbitmqctl delete_queue queue_name

Memory Issues

// Reduce prefetch count
prefetchCount: 5,

// Enable lazy queues (automatic in high-throughput mode)
highThroughput: true

Connection Issues

// Increase reconnection frequency
reconnectDelay: 2000,

// Reduce heartbeat interval
heartbeat: 30

Monitoring Commands

# Check RabbitMQ status
docker exec rabbitmq-container rabbitmqctl status

# List queues and message counts
docker exec rabbitmq-container rabbitmqctl list_queues name messages

# Monitor connections
docker exec rabbitmq-container rabbitmqctl list_connections

# Check memory usage
docker stats rabbitmq-container

πŸ”— API Reference

createRabbitMQ(config)

Creates a new RabbitMQ instance with the provided configuration.

rabbitmq.registerQueue(options)

Registers a queue with consumer and sets up processing pipeline.

rabbitmq.publish(queueName, data, options?)

Publishes a message to the specified queue.

rabbitmq.isConnected()

Returns boolean indicating if the connection is active.

rabbitmq.getConnectionInfo()

Returns current connection configuration or null if disconnected.

rabbitmq.close()

Gracefully closes all connections and cleans up resources.

🀝 Contributing

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

πŸ“„ License

MIT License - see the LICENSE file for details.

🎯 Performance Benchmarks

Scenario Messages/sec Memory Usage Recommended Use
Small Scale 100-1,000 Low Development, small apps
Medium Scale 1,000-10,000 Medium Production apps
High-Throughput 10,000-100,000+ Low (lazy queues) Enterprise, analytics

Built with ❀️ for production-grade applications