Important: This documentation covers Yarn 1 (Classic).
For Yarn 2+ docs and migration guide, see yarnpkg.com.

Package detail

@attestate/extraction-worker

attestate224GPL-3.0-only0.7.3

A component that accepts JSON objects to execute simple or complex extraction maneuvres.

worker, requests, fetch, queue, batch, extraction

readme

@attestate/extraction-worker

The extraction-worker is a component that accepts JSON objects to execute simple or complex extraction maneuvres. Originally, the extraction worker was conceived as utility tool for @neume-network/extraction-worker, but it is frankly useful for any kind of task that requires downloading a bigger chunk of data from various endpoints.

The extraction worker supports: JSON-RPC, GraphQL, HTTPS, Arweave and IPFS.

Installation

npm i @attestate/extraction-worker
npm i eth-fun --no-save # to install eth-fun as a peer dependency

Usage

In general, also consider looking at the crawler's main documentation page to find out more about extraction-worker configuration options.

If you're looking to extract a single message, use the execute function.

import { execute } from "@attestate/extraction-worker";

const message = {
  version: "0.0.1",
  type: "json-rpc",
  method: "eth_getTransactionReceipt",
  params: [
    "0xed14c3386aea0c5b39ffea466997ff13606eaedf03fe7f431326531f35809d1d",
  ],
  options: {
    url: "https://..."
  }
};

const outcome = await execute(message);

if (!outcome.results) {
  console.error(outcome.error);
  return;
}
console.log(outcome.results);

Else, you can use the extraction worker to stream tasks. For that, you'll need to create a worker execution module

worker_start.mjs

import "dotenv/config";
import { Worker, isMainThread, workerData } from "worker_threads";

import logger from "./logger.mjs";
import { run } from "@attestate/extraction-worker";

const log = logger("start");
const module = {
  defaults: {
    workerData: {
      queue: {
        options: {
          concurrent: 1,
        },
      },
    },
  },
};

if (isMainThread) {
  log("Detected mainthread: Respawning extractor as worker_thread");
  // INFO: We're launching this file as a `Worker` when the mainthread is
  // detected as this can be useful when running it without an accompanying
  // other process.
  new Worker(__filename, { workerData: module.defaults.workerData });
} else {
  run();
}

You can then execute it as follows

import { once } from "events";
import { Worker } from "worker_threads";

const worker = new Worker(workerPath, {
  workerData: {
    queue: {
      options: {
        concurrent: 1,
      },
    },
  },
});

const message = {
  version: "0.0.1",
  type: "json-rpc",
  method: "eth_getTransactionReceipt",
  params: [
    "0xed14c3386aea0c5b39ffea466997ff13606eaedf03fe7f431326531f35809d1d",
  ],
  options: {
    url: "https://..."
  }
};
worker.postMessage(message);
const [outcome] = await once(w, "message");

if (!outcome.results) {
  console.error(outcome.error);
  return;
}
console.log(outcome.results);

License

GPL-3.0-only, see LICENSE file for details.

changelog

Changelog

0.7.3

  • Add getTransactionByHash call

0.7.2

  • Add async function execute(message, concurrency) to allow usage of extraction worker without events architecture

0.7.1

  • Pass headers in ipfs message to gateway

0.7.0

  • (breaking) For unparsable JSON, we now return the text response as a results in case the HTTP header Content-Type includes json.
  • Add type: ipfs worker message handling
  • Add type: arweave worker message handling

0.6.1

0.6.0

  • (breaking) Switch from better-queue to fastq. Whereas extraction worker users pinning a version below 0.6.0 had to expect string-only errors in return messages, this deviation was now fixed through fastq that always provides the message and a potential error message in case of failure. The function panic(...) implementation shows it.

0.5.2 (mistake and later unpublished)

  • Switch from better-queue to fastq

0.5.1

  • Add endpoints property that allows setting an endpoint-specific timeout and rate limit

0.5.0

  • (breaking) Upon failures in the worker/queue, extraction-worker attempts to return as much context back to the user by e.g. sending the augmented message object (with the error property filled out). This may have been broken in earlier versions.

0.4.0

0.3.2

  • Add eth_getLogs translation

0.3.1

  • Properly pass numerical timeout value in milliseconds to setTimeout.

0.3.0

  • (breaking) Pass entire queue options configuration through workerData.
  • (breaking) Make eth-fun a peerDependency.
  • Through DEBUG environment variable, allow inspecting queue's statistics.
  • Improve internal error handling.
  • Upgrade to @neume-network/message-schema@0.3.1 that includes the timeout property and implement timeouts with AbortSignal.
  • Improve error messages for messages of type https

0.2.0

0.1.0

  • Re-release as @neume-network/extraction-worker

0.0.3

0.0.2

  • Add graphql job type

0.0.1

  • Initial release