π RabbitMQ Toolkit
Modern, type-safe RabbitMQ wrapper with built-in best practices for Node.js applications. Optimized for both small-scale and high-throughput enterprise applications.
β¨ Features
π‘οΈ Reliability & Safety
- β Zero Message Loss: Persistent messages with durable queues
- β Auto-Reconnection: Built-in connection recovery with configurable delays
- β Dead Letter Queues: Failed messages are preserved and trackable
- β Manual ACK: Messages stay in queue until successfully processed
- β Graceful Shutdown: Clean application termination
β‘ Performance & Scalability
- β Lazy Queues: Optimized for high-throughput scenarios (50K+ msg/sec)
- β Batch Publishing: Automatic batching for improved performance
- β Configurable Prefetch: Memory and concurrency optimization
- β Retry Mechanism: Exponential backoff with customizable strategies
- β Connection Pooling: Efficient resource utilization
π§ Developer Experience
- β Full TypeScript: Complete type safety with comprehensive interfaces
- β Modular Architecture: Clean, extensible codebase
- β Comprehensive Error Handling: Detailed error types and contexts
- β Production Best Practices: Built-in enterprise patterns
π¦ Installation
npm install rabbitmq-enterprise-toolkit
π Quick Start
import { createRabbitMQ } from 'rabbitmq-enterprise-toolkit';
const rabbitmq = createRabbitMQ({
connection: {
host: 'localhost',
port: 5672,
username: 'guest',
password: 'guest'
}
});
// Register consumer first (creates queue with proper settings)
await rabbitmq.registerQueue({
queueName: 'orders',
processor: async (data) => {
console.log('Processing order:', data);
await processOrder(data);
},
errCallback: async (error) => {
console.error('Order processing failed:', error);
}
});
// Publish messages
await rabbitmq.publish('orders', {
orderId: '12345',
customerId: 'user-456',
items: [{ productId: 'p1', quantity: 2 }]
});
π Complete Configuration Reference
π Connection Configuration
interface ConnectionConfig {
host: string; // RabbitMQ server host
port: number; // RabbitMQ server port (default: 5672)
username: string; // Authentication username
password: string; // Authentication password
vhost?: string; // Virtual host (default: '/')
reconnectDelay?: number; // Reconnection delay in ms (default: 5000)
heartbeat?: number; // Heartbeat interval in seconds (default: 60)
ssl?: boolean; // Enable SSL/TLS (default: false)
}
βοΈ RabbitMQ Configuration
interface RabbitMQConfig {
connection: ConnectionConfig;
defaultPrefetchCount?: number; // Global prefetch count (default: 10)
gracefulShutdownTimeout?: number; // Shutdown timeout in ms (default: 10000)
highThroughput?: boolean; // Enable high-throughput optimizations
}
π― Queue Configuration
interface QueueOptions {
queueName: string; // Queue name (will be normalized)
processor: (data: any) => Promise<void>; // Message processor function
errCallback?: (error: Error) => Promise<void>; // Error handler
prefetchCount?: number; // Queue-specific prefetch (default: 10)
retryOptions?: RetryOptions; // Retry configuration
durable?: boolean; // Queue persistence (default: true)
exclusive?: boolean; // Exclusive queue (default: false)
autoDelete?: boolean; // Auto-delete when empty (default: false)
deadLetterExchange?: string; // Custom DLX name
deadLetterRoutingKey?: string; // Custom DLX routing key
}
π Retry Configuration
interface RetryOptions {
maxRetries: number; // Maximum retry attempts
retryDelay: number; // Initial delay in milliseconds
delayType: 'fixed' | 'exponential'; // Retry delay strategy
delay?: number; // Custom delay for exponential (default: retryDelay)
}
π€ Publishing Options
interface PublishOptions {
persistent?: boolean; // Message persistence (default: true)
messageId?: string; // Custom message ID (auto-generated if not provided)
timestamp?: number; // Custom timestamp (auto-generated if not provided)
expiration?: string; // Message TTL in milliseconds
priority?: number; // Message priority (0-255)
batch?: boolean; // Enable batch publishing (default: true in high-throughput mode)
}
π Scenario-Based Recommendations
π£ Small Scale Applications (< 1K messages/hour)
const rabbitmq = createRabbitMQ({
connection: {
host: 'localhost',
port: 5672,
username: 'guest',
password: 'guest',
reconnectDelay: 5000,
heartbeat: 60
},
defaultPrefetchCount: 5, // Low prefetch for resource efficiency
gracefulShutdownTimeout: 10000
});
await rabbitmq.registerQueue({
queueName: 'notifications',
processor: async (data) => {
await sendEmail(data);
},
prefetchCount: 3, // Process 3 messages concurrently
retryOptions: {
maxRetries: 5,
retryDelay: 2000,
delayType: 'exponential' // 2s, 4s, 8s, 16s, 32s
}
});
π’ Medium Scale Applications (1K-10K messages/hour)
const rabbitmq = createRabbitMQ({
connection: {
host: 'rabbitmq.company.com',
port: 5672,
username: process.env.RABBITMQ_USER,
password: process.env.RABBITMQ_PASS,
reconnectDelay: 3000,
heartbeat: 30,
ssl: true // Enable SSL for production
},
defaultPrefetchCount: 20, // Balanced prefetch
gracefulShutdownTimeout: 15000
});
await rabbitmq.registerQueue({
queueName: 'order-processing',
processor: async (data) => {
await processOrder(data);
await updateInventory(data);
await sendConfirmation(data);
},
prefetchCount: 15, // Process 15 orders concurrently
retryOptions: {
maxRetries: 3,
retryDelay: 1500,
delayType: 'exponential'
}
});
π High-Throughput Applications (10K+ messages/hour)
const rabbitmq = createRabbitMQ({
connection: {
host: 'rabbitmq-cluster.company.com',
port: 5672,
username: process.env.RABBITMQ_USER,
password: process.env.RABBITMQ_PASS,
reconnectDelay: 2000,
heartbeat: 30,
ssl: true
},
defaultPrefetchCount: 100, // High prefetch for throughput
gracefulShutdownTimeout: 30000,
highThroughput: true // Enable high-throughput optimizations
});
await rabbitmq.registerQueue({
queueName: 'analytics-events',
processor: async (data) => {
await saveToDatabase(data); // Fast database operations
},
prefetchCount: 200, // Very high concurrent processing
retryOptions: {
maxRetries: 3,
retryDelay: 1000,
delayType: 'fixed' // Fixed delay for faster recovery
}
});
// Batch publishing for maximum throughput
for (let i = 0; i < 100000; i++) {
await rabbitmq.publish('analytics-events', {
userId: i,
event: 'page_view',
timestamp: Date.now()
}, {
batch: true, // Automatic batching
persistent: true
});
}
π Retry Strategies Explained
Exponential Backoff (Recommended for transient errors)
retryOptions: {
maxRetries: 5,
retryDelay: 1000,
delayType: 'exponential'
}
// Retry delays: 1s β 2s β 4s β 8s β 16s
Best for: API timeouts, temporary database locks, network hiccups
Fixed Delay (Recommended for high-throughput)
retryOptions: {
maxRetries: 3,
retryDelay: 2000,
delayType: 'fixed'
}
// Retry delays: 2s β 2s β 2s
Best for: High-volume processing, consistent external service delays
Custom Exponential
retryOptions: {
maxRetries: 4,
retryDelay: 500, // Initial delay
delayType: 'exponential',
delay: 2000 // Custom base for exponential calculation
}
// Retry delays: 2s β 4s β 8s β 16s
π οΈ Advanced Usage Examples
Error Handling & Monitoring
await rabbitmq.registerQueue({
queueName: 'critical-operations',
processor: async (data) => {
const startTime = Date.now();
try {
await performCriticalOperation(data);
console.log(`β
Operation completed in ${Date.now() - startTime}ms`);
} catch (error) {
console.error(`β Operation failed for ${data.id}:`, error);
// Send alert for critical failures
if (error.code === 'CRITICAL_ERROR') {
await sendAlert(`Critical operation failed: ${data.id}`);
}
throw error; // Re-throw to trigger retry mechanism
}
},
errCallback: async (error, data) => {
// Log all errors for monitoring
console.error('Error details:', {
error: error.message,
messageId: data?.id,
timestamp: new Date().toISOString(),
stack: error.stack
});
// Send to monitoring system
await metrics.increment('message_processing_errors');
},
retryOptions: {
maxRetries: 5,
retryDelay: 2000,
delayType: 'exponential'
}
});
Dead Letter Queue Monitoring
// Monitor failed messages
await rabbitmq.registerQueue({
queueName: 'orders.dlq',
processor: async (data) => {
console.log('π¨ CRITICAL: Order permanently failed:', data);
// Alert operations team
await sendSlackAlert(`β Order ${data.orderId} failed permanently after all retries`);
// Store in failed orders table for manual review
await database.failedOrders.create({
orderId: data.orderId,
originalData: data,
failureTime: new Date(),
reason: 'Max retries exceeded'
});
}
});
Multiple Queue Management
const queueConfigs = [
{
name: 'high-priority-orders',
prefetch: 50,
retries: 5
},
{
name: 'normal-orders',
prefetch: 20,
retries: 3
},
{
name: 'background-tasks',
prefetch: 5,
retries: 2
}
];
for (const config of queueConfigs) {
await rabbitmq.registerQueue({
queueName: config.name,
processor: async (data) => {
await processBasedOnQueue(config.name, data);
},
prefetchCount: config.prefetch,
retryOptions: {
maxRetries: config.retries,
retryDelay: 1000,
delayType: 'exponential'
}
});
}
Graceful Shutdown Implementation
// Production-ready shutdown handling
let isShuttingDown = false;
process.on('SIGTERM', handleShutdown);
process.on('SIGINT', handleShutdown);
async function handleShutdown(signal: string) {
if (isShuttingDown) return;
isShuttingDown = true;
console.log(`π₯ Received ${signal}. Starting graceful shutdown...`);
// Stop accepting new work
server.close();
// Close RabbitMQ connections gracefully
await rabbitmq.close();
console.log('β
Graceful shutdown completed');
process.exit(0);
}
π Production Deployment
Environment Configuration
// config/production.ts
const config = {
rabbitmq: {
connection: {
host: process.env.RABBITMQ_HOST,
port: parseInt(process.env.RABBITMQ_PORT || '5672'),
username: process.env.RABBITMQ_USERNAME,
password: process.env.RABBITMQ_PASSWORD,
vhost: process.env.RABBITMQ_VHOST || '/',
ssl: process.env.NODE_ENV === 'production',
heartbeat: 30,
reconnectDelay: 3000
},
defaultPrefetchCount: parseInt(process.env.RABBITMQ_PREFETCH || '20'),
gracefulShutdownTimeout: 30000
}
};
Docker Compose for Development
# docker-compose.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
hostname: rabbitmq-dev
ports:
- "5672:5672"
- "15672:15672"
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=password
- RABBITMQ_VM_MEMORY_HIGH_WATERMARK=0.8
volumes:
- rabbitmq_data:/var/lib/rabbitmq
restart: unless-stopped
volumes:
rabbitmq_data:
Health Check Implementation
// Health check endpoint
app.get('/health', (req, res) => {
const health = {
status: rabbitmq.isConnected() ? 'healthy' : 'unhealthy',
timestamp: new Date().toISOString(),
uptime: process.uptime(),
memory: process.memoryUsage(),
connection: rabbitmq.getConnectionInfo()
};
const statusCode = health.status === 'healthy' ? 200 : 503;
res.status(statusCode).json(health);
});
π Performance Tuning
Memory Optimization
// For high-memory usage scenarios
const rabbitmq = createRabbitMQ({
connection: {
host: 'localhost',
port: 5672,
username: 'guest',
password: 'guest',
heartbeat: 30
},
defaultPrefetchCount: 50, // Adjust based on available memory
highThroughput: true
});
// Queues automatically configured with:
// - Lazy mode (disk-based storage)
// - Max length limits (1M messages)
// - Overflow protection
CPU Optimization
// Distribute load across multiple consumers
const workers = require('os').cpus().length;
for (let i = 0; i < workers; i++) {
await rabbitmq.registerQueue({
queueName: `worker-queue-${i}`,
processor: async (data) => {
await processCPUIntensiveTask(data);
},
prefetchCount: 10
});
}
Network Optimization
// Batch publishing for network efficiency
const messages = generateLargeDataSet();
for (const message of messages) {
await rabbitmq.publish('bulk-queue', message, {
batch: true, // Automatic batching (100 messages or 50ms)
persistent: true
});
}
π§ͺ Testing
Unit Testing Example
// tests/rabbitmq.test.ts
import { createRabbitMQ } from 'rabbitmq-enterprise-toolkit';
describe('RabbitMQ Toolkit', () => {
let rabbitmq: RabbitMQInstance;
beforeEach(() => {
rabbitmq = createRabbitMQ({
connection: {
host: 'localhost',
port: 5672,
username: 'guest',
password: 'guest'
}
});
});
afterEach(async () => {
await rabbitmq.close();
});
it('should process messages successfully', async () => {
const processedMessages: any[] = [];
await rabbitmq.registerQueue({
queueName: 'test-queue',
processor: async (data) => {
processedMessages.push(data);
}
});
await rabbitmq.publish('test-queue', { id: 1, test: true });
// Wait for processing
await new Promise(resolve => setTimeout(resolve, 1000));
expect(processedMessages).toHaveLength(1);
expect(processedMessages[0]).toEqual({ id: 1, test: true });
});
});
Integration Testing
# Start test environment
docker-compose -f docker-compose.test.yml up -d
# Run tests
npm test
# Run specific example
npm run example:basic
npm run example:high-throughput
π¨ Troubleshooting
Common Issues
Queue Precondition Failed
# Clean existing queues
docker exec -it rabbitmq-container rabbitmqctl list_queues
docker exec -it rabbitmq-container rabbitmqctl delete_queue queue_name
Memory Issues
// Reduce prefetch count
prefetchCount: 5,
// Enable lazy queues (automatic in high-throughput mode)
highThroughput: true
Connection Issues
// Increase reconnection frequency
reconnectDelay: 2000,
// Reduce heartbeat interval
heartbeat: 30
Monitoring Commands
# Check RabbitMQ status
docker exec rabbitmq-container rabbitmqctl status
# List queues and message counts
docker exec rabbitmq-container rabbitmqctl list_queues name messages
# Monitor connections
docker exec rabbitmq-container rabbitmqctl list_connections
# Check memory usage
docker stats rabbitmq-container
π API Reference
createRabbitMQ(config)
Creates a new RabbitMQ instance with the provided configuration.
rabbitmq.registerQueue(options)
Registers a queue with consumer and sets up processing pipeline.
rabbitmq.publish(queueName, data, options?)
Publishes a message to the specified queue.
rabbitmq.isConnected()
Returns boolean indicating if the connection is active.
rabbitmq.getConnectionInfo()
Returns current connection configuration or null if disconnected.
rabbitmq.close()
Gracefully closes all connections and cleans up resources.
π€ Contributing
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature
) - Commit your changes (
git commit -m 'Add amazing feature'
) - Push to the branch (
git push origin feature/amazing-feature
) - Open a Pull Request
π License
MIT License - see the LICENSE file for details.
π― Performance Benchmarks
Scenario | Messages/sec | Memory Usage | Recommended Use |
---|---|---|---|
Small Scale | 100-1,000 | Low | Development, small apps |
Medium Scale | 1,000-10,000 | Medium | Production apps |
High-Throughput | 10,000-100,000+ | Low (lazy queues) | Enterprise, analytics |
Built with β€οΈ for production-grade applications