SmartTaskQueue
一个高性能、功能丰富的智能任务队列管理库,专为Node.js应用设计,具备自动扩缩容、并发控制、任务去重和失败重试等特性。
特性
- 智能并发控制:自动根据负载调整并发数量,避免资源过载
- 任务去重:基于任务ID自动去重,防止重复处理
- 失败自动重试:内置指数退避算法,优雅处理临时错误
- 事件驱动:完整的事件系统,方便监控和扩展
- 性能监控:内置吞吐量计算和队列状态监控
- 队列溢出保护:防止内存溢出,自动拒绝过多任务
- 可扩展性:简单的继承模型,易于定制业务逻辑
安装
npm install smart-task-queue
# 或使用 yarn
yarn add smart-task-queue
# 或使用 pnpm
pnpm add smart-task-queue
基本用法
创建自定义队列
import SmartTaskQueue from 'smart-task-queue';
// 创建自定义队列类
class MyQueue extends SmartTaskQueue {
// 必须实现此方法来处理任务
async processTask(payload) {
// 实现您的业务逻辑
console.log(`处理任务: ${payload.id}`);
return { processed: true, data: payload };
}
}
// 实例化队列
const queue = new MyQueue({
maxConcurrency: 100, // 最大并发数
minConcurrency: 10, // 最小并发数
taskExpireTime: 60000 // 任务去重过期时间(毫秒)
});
// 监听事件
queue.on('success', ({ taskId, result }) => {
console.log(`任务 ${taskId} 成功完成:`, result);
});
queue.on('failure', ({ taskId, error }) => {
console.error(`任务 ${taskId} 失败:`, error);
});
添加任务
// 添加简单任务
async function runTask() {
try {
const result = await queue.add(
'unique-task-id', // 任务唯一ID
() => fetchData(), // 任务函数
3 // 重试次数(可选,默认3次)
);
console.log('任务结果:', result);
} catch (error) {
console.error('任务最终失败:', error);
}
}
// 批量添加任务
async function processBatch(items) {
const promises = items.map((item, index) => {
return queue.add(
`task-${item.id}`,
() => processItem(item),
2
);
});
// 等待所有任务完成
const results = await Promise.allSettled(promises);
console.log(`完成: ${results.filter(r => r.status === 'fulfilled').length}`);
console.log(`失败: ${results.filter(r => r.status === 'rejected').length}`);
}
使用事件系统
// 监听队列状态变化
queue.on('monitor', (status) => {
console.log('队列状态:', status);
// 例如: 记录指标到监控系统
});
// 监听并发变化
queue.on('concurrencyChange', (newValue) => {
console.log(`并发数已调整为: ${newValue}`);
});
// 监听任务重复
queue.on('duplicate', (taskId) => {
console.log(`任务 ${taskId} 被去重机制拦截`);
});
// 监听队列溢出
queue.on('reject', ({ taskId, error }) => {
console.error(`任务 ${taskId} 被拒绝: ${error.message}`);
});
API 文档
构造函数选项
const queue = new MyQueue({
maxConcurrency: 100, // 最大并发数
minConcurrency: 10, // 最小并发数
maxQueue: 10000, // 最大队列长度
monitorInterval: 5000, // 监控间隔(毫秒)
taskExpireTime: 60000, // 任务去重过期时间(毫秒)
autoStart: true // 是否自动启动队列
});
方法
方法 | 描述 |
---|---|
add(taskId, taskFn, retries = 3) |
添加任务到队列 |
start() |
启动队列监控 |
stop() |
停止队列监控 |
setConcurrency(newConcurrency) |
动态调整并发数 |
getStatus() |
获取当前队列状态 |
processTask(payload) |
需子类实现的任务处理方法 |
事件
事件 | 参数 | 描述 |
---|---|---|
success |
{ taskId, result } |
任务成功完成 |
failure |
{ taskId, error } |
任务失败 |
duplicate |
taskId |
检测到重复任务 |
reject |
{ taskId, error } |
任务被拒绝(队列溢出) |
monitor |
statusObject |
队列状态更新 |
concurrencyChange |
newValue |
并发数变化 |
taskSuccess |
{ id, result } |
子类任务处理成功 |
taskError |
{ id, error } |
子类任务处理失败 |
高级用法
自定义任务处理逻辑
class DatabaseQueue extends SmartTaskQueue {
constructor(options = {}) {
super(options);
this.db = options.database;
}
async processTask(payload) {
const { operation, data } = payload;
switch (operation) {
case 'insert':
return await this.db.insert(data);
case 'update':
return await this.db.update(data.id, data);
case 'delete':
return await this.db.delete(data.id);
default:
throw new Error(`未知操作: ${operation}`);
}
}
}
// 使用
const dbQueue = new DatabaseQueue({
database: myDatabaseConnection,
maxConcurrency: 50
});
// 发送任务
dbQueue.emit('task', {
id: 'insert-user-123',
operation: 'insert',
data: { name: '张三', email: 'zhangsan@example.com' }
});
动态调整并发数
// 根据系统负载调整并发数
function adjustConcurrency() {
const cpuUsage = getCPUUsage(); // 假设的函数
if (cpuUsage > 80) {
// CPU负载高,降低并发
queue.setConcurrency(20);
} else if (cpuUsage < 30) {
// CPU负载低,提高并发
queue.setConcurrency(150);
}
}
// 每10秒检查一次
setInterval(adjustConcurrency, 10000);
性能优化建议
- 合理设置并发数:根据任务类型和系统资源设置合适的
maxConcurrency
和minConcurrency
- 任务函数优化:确保任务函数内部高效,避免不必要的阻塞操作
- 适当的去重时间:根据业务需求设置合适的
taskExpireTime
,过长会占用更多内存 - 监控队列状态:定期检查
getStatus()
返回的指标,特别是吞吐量和失败率 - 合理使用重试:对于可重试的临时错误设置重试,对于永久性错误避免无意义重试
许可证
MIT