Important: This documentation covers Yarn 1 (Classic).
For Yarn 2+ docs and migration guide, see yarnpkg.com.

Package detail

worker-threads-manager

ClassicDP21MIT1.0.2TypeScript support: included

A TypeScript library for managing worker threads with serialization.

typescript, worker-threads, serialization, parallel-processing, worker, threads, parallelism, multithreading

readme

Worker Threads Manager

Worker Threads Manager is a TypeScript library designed to simplify the management of worker threads in Node.js, enabling asynchronous method invocation between threads with strong typing, serialization/deserialization of data, and robust error handling. This library also supports the registration and use of custom classes, including those with recursively nested data.

Features

  • Strong Typing: The library enforces strict typing across worker threads, ensuring type safety and reducing runtime errors.
  • Serialization and Deserialization: Complex data structures, including custom classes and recursively nested objects, are automatically serialized before being sent to worker threads and deserialized upon receipt.
  • Error Handling: Includes built-in mechanisms for timeout management and error handling in asynchronous method invocations.
  • Class Registration: Allows for the registration of custom classes so that they can be properly serialized and deserialized during inter-thread communication.

Installation

npm install worker-threads-manager

Basic Usage

1. Define Handlers and Worker Controller

Handlers are defined in the worker thread. These handlers are functions that can be called from the main thread.

import { WorkerController } from "../src";
import { Handlers } from "../src/interfaces";

export class Matrix {
    a: number;
    constructor(x: number) {
        this.a = x;
    }

    addMatrix(m: Matrix) {
        this.a += m.a;
        return this;
    }
}

export class WorkerHandlers implements Handlers {
    processData(data: string): string {
        return `Processed: ${data}`;
    }

    calculateSum(a: number, b: number): number {
        return a + b;
    }

    matrixOperations(m1: Matrix, m2: Matrix) {
        return m1.addMatrix(m2);
    }
}

WorkerController.initialize(new WorkerHandlers());
WorkerController.registerClasses([Matrix]);

2. Use the Worker Manager in the Main Thread

The WorkerManager is used in the main thread to create worker threads, invoke methods in the worker, and manage their lifecycle.

import { WorkerManager } from "../src";
import { Matrix, WorkerHandlers } from "./worker";
import { resolve } from "path";

(async () => {
    let manager = new WorkerManager<WorkerHandlers>();

    // Create the first worker and invoke a method
    let workerId1 = await manager.createWorkerWithHandlers(resolve(__dirname, 'worker.js'));
    let res1 = await manager.call(workerId1, 'processData', '123');

    // Create a second worker and invoke a different method
    let workerId2 = await manager.createWorkerWithHandlers(resolve(__dirname, 'worker.js'));
    let res2 = await manager.call(workerId2, 'calculateSum', 1, 2);

    // Register classes for serialization
    manager.registerClasses([Matrix]);

    // Perform matrix operations involving custom classes
    let m = await manager.call(workerId1, 'matrixOperations', new Matrix(2), new Matrix(3));

    console.log(res1, res2, m);

    // Terminate the workers
    await manager.terminateWorker(workerId1);
    await manager.terminateWorker(workerId2);
})().then(() => console.log('finish'));

Working with Custom Classes

Defining and Registering Classes

You can define custom classes and register them with the WorkerController and WorkerManager. This ensures that instances of these classes are correctly serialized and deserialized when sent between threads.

Example with Recursively Nested Data

export class TreeNode {
    value: number;
    left: TreeNode | null = null;
    right: TreeNode | null = null;

    constructor(value: number) {
        this.value = value;
    }

    addLeft(child: TreeNode) {
        this.left = child;
    }

    addRight(child: TreeNode) {
        this.right = child;
    }

    calculateSum(): number {
        let sum = this.value;
        if (this.left) sum += this.left.calculateSum();
        if (this.right) sum += this.right.calculateSum();
        return sum;
    }
}

export class TreeWorkerHandlers implements Handlers {
    calculateTreeSum(root: TreeNode): number {
        return root.calculateSum();
    }
}

WorkerController.initialize(new TreeWorkerHandlers());
WorkerController.registerClasses([TreeNode]);

In the main thread:

import { WorkerManager } from "../src";
import { TreeNode, TreeWorkerHandlers } from "./treeWorker";
import { resolve } from "path";

(async () => {
    let manager = new WorkerManager<TreeWorkerHandlers>();

    // Register the TreeNode class
    manager.registerClasses([TreeNode]);

    // Create a tree structure
    let root = new TreeNode(10);
    root.addLeft(new TreeNode(5));
    root.addRight(new TreeNode(15));
    root.left?.addLeft(new TreeNode(3));
    root.left?.addRight(new TreeNode(7));

    // Create a worker and calculate the sum of the tree
    let workerId = await manager.createWorkerWithHandlers(resolve(__dirname, 'treeWorker.js'));
    let treeSum = await manager.call(workerId, 'calculateTreeSum', root);

    console.log(`Tree sum: ${treeSum}`); // Expected output: 40

    await manager.terminateWorker(workerId);
})().then(() => console.log('finish'));

Recursive Structures

The SerDe class in the library can handle recursive structures. When serializing, it keeps track of the objects it has visited to avoid infinite loops and correctly reconstructs the object graph upon deserialization.

Key Principles

  1. Serialization and Deserialization: Custom classes and complex data structures are serialized before being passed to the worker and deserialized on the worker side. The result is similarly serialized in the worker and deserialized in the main thread.

  2. Class Registration: Before any class instances can be passed between threads, they must be registered using WorkerManager.registerClasses() and WorkerController.registerClasses().

  3. Thread Management: The library provides straightforward methods for creating, calling, and terminating worker threads, encapsulating the complexity of inter-thread communication.

Error Handling

  • Timeouts: By default, there is a timeout for worker responses, which can be customized or disabled by adjusting the WorkerManager constructor's timeout parameter.

  • Exception Propagation: If an error occurs in the worker thread, it is captured, serialized, and passed back to the main thread, where it can be handled appropriately.

Conclusion

The Worker Threads Manager library provides an efficient and type-safe way to manage worker threads in a Node.js application, with robust support for complex data structures and custom classes. It simplifies the interaction between threads, making it easy to offload heavy computations or parallelize tasks in your applications.


This README.md provides a solid foundation for users of your library, explaining how to get started and showcasing the library's features, including handling complex objects and recursive data structures.