Important: This documentation covers Yarn 1 (Classic).
For Yarn 2+ docs and migration guide, see yarnpkg.com.

Package detail

smart-task-queue

iakuf60MIT1.0.1

智能任务队列系统,支持自动并发控制、任务去重、指数退避重试和实时性能监控

task queue, concurrency control, job scheduler, retry strategy

readme

SmartTaskQueue

一个高性能、功能丰富的智能任务队列管理库,专为Node.js应用设计,具备自动扩缩容、并发控制、任务去重和失败重试等特性。

特性

  • 智能并发控制:自动根据负载调整并发数量,避免资源过载
  • 任务去重:基于任务ID自动去重,防止重复处理
  • 失败自动重试:内置指数退避算法,优雅处理临时错误
  • 事件驱动:完整的事件系统,方便监控和扩展
  • 性能监控:内置吞吐量计算和队列状态监控
  • 队列溢出保护:防止内存溢出,自动拒绝过多任务
  • 可扩展性:简单的继承模型,易于定制业务逻辑

安装

npm install smart-task-queue
# 或使用 yarn
yarn add smart-task-queue
# 或使用 pnpm
pnpm add smart-task-queue

基本用法

创建自定义队列

import SmartTaskQueue from 'smart-task-queue';

// 创建自定义队列类
class MyQueue extends SmartTaskQueue {
  // 必须实现此方法来处理任务
  async processTask(payload) {
    // 实现您的业务逻辑
    console.log(`处理任务: ${payload.id}`);
    return { processed: true, data: payload };
  }
}

// 实例化队列
const queue = new MyQueue({
  maxConcurrency: 100,  // 最大并发数
  minConcurrency: 10,   // 最小并发数
  taskExpireTime: 60000 // 任务去重过期时间(毫秒)
});

// 监听事件
queue.on('success', ({ taskId, result }) => {
  console.log(`任务 ${taskId} 成功完成:`, result);
});

queue.on('failure', ({ taskId, error }) => {
  console.error(`任务 ${taskId} 失败:`, error);
});

添加任务

// 添加简单任务
async function runTask() {
  try {
    const result = await queue.add(
      'unique-task-id',      // 任务唯一ID
      () => fetchData(),     // 任务函数
      3                      // 重试次数(可选,默认3次)
    );
    console.log('任务结果:', result);
  } catch (error) {
    console.error('任务最终失败:', error);
  }
}

// 批量添加任务
async function processBatch(items) {
  const promises = items.map((item, index) => {
    return queue.add(
      `task-${item.id}`,
      () => processItem(item),
      2
    );
  });

  // 等待所有任务完成
  const results = await Promise.allSettled(promises);
  console.log(`完成: ${results.filter(r => r.status === 'fulfilled').length}`);
  console.log(`失败: ${results.filter(r => r.status === 'rejected').length}`);
}

使用事件系统

// 监听队列状态变化
queue.on('monitor', (status) => {
  console.log('队列状态:', status);
  // 例如: 记录指标到监控系统
});

// 监听并发变化
queue.on('concurrencyChange', (newValue) => {
  console.log(`并发数已调整为: ${newValue}`);
});

// 监听任务重复
queue.on('duplicate', (taskId) => {
  console.log(`任务 ${taskId} 被去重机制拦截`);
});

// 监听队列溢出
queue.on('reject', ({ taskId, error }) => {
  console.error(`任务 ${taskId} 被拒绝: ${error.message}`);
});

API 文档

构造函数选项

const queue = new MyQueue({
  maxConcurrency: 100,    // 最大并发数
  minConcurrency: 10,     // 最小并发数
  maxQueue: 10000,        // 最大队列长度
  monitorInterval: 5000,  // 监控间隔(毫秒)
  taskExpireTime: 60000,  // 任务去重过期时间(毫秒)
  autoStart: true         // 是否自动启动队列
});

方法

方法 描述
add(taskId, taskFn, retries = 3) 添加任务到队列
start() 启动队列监控
stop() 停止队列监控
setConcurrency(newConcurrency) 动态调整并发数
getStatus() 获取当前队列状态
processTask(payload) 需子类实现的任务处理方法

事件

事件 参数 描述
success { taskId, result } 任务成功完成
failure { taskId, error } 任务失败
duplicate taskId 检测到重复任务
reject { taskId, error } 任务被拒绝(队列溢出)
monitor statusObject 队列状态更新
concurrencyChange newValue 并发数变化
taskSuccess { id, result } 子类任务处理成功
taskError { id, error } 子类任务处理失败

高级用法

自定义任务处理逻辑

class DatabaseQueue extends SmartTaskQueue {
  constructor(options = {}) {
    super(options);
    this.db = options.database;
  }

  async processTask(payload) {
    const { operation, data } = payload;

    switch (operation) {
      case 'insert':
        return await this.db.insert(data);
      case 'update':
        return await this.db.update(data.id, data);
      case 'delete':
        return await this.db.delete(data.id);
      default:
        throw new Error(`未知操作: ${operation}`);
    }
  }
}

// 使用
const dbQueue = new DatabaseQueue({
  database: myDatabaseConnection,
  maxConcurrency: 50
});

// 发送任务
dbQueue.emit('task', {
  id: 'insert-user-123',
  operation: 'insert',
  data: { name: '张三', email: 'zhangsan@example.com' }
});

动态调整并发数

// 根据系统负载调整并发数
function adjustConcurrency() {
  const cpuUsage = getCPUUsage(); // 假设的函数

  if (cpuUsage > 80) {
    // CPU负载高,降低并发
    queue.setConcurrency(20);
  } else if (cpuUsage < 30) {
    // CPU负载低,提高并发
    queue.setConcurrency(150);
  }
}

// 每10秒检查一次
setInterval(adjustConcurrency, 10000);

性能优化建议

  1. 合理设置并发数:根据任务类型和系统资源设置合适的maxConcurrencyminConcurrency
  2. 任务函数优化:确保任务函数内部高效,避免不必要的阻塞操作
  3. 适当的去重时间:根据业务需求设置合适的taskExpireTime,过长会占用更多内存
  4. 监控队列状态:定期检查getStatus()返回的指标,特别是吞吐量和失败率
  5. 合理使用重试:对于可重试的临时错误设置重试,对于永久性错误避免无意义重试

许可证

MIT