Important: This documentation covers Yarn 1 (Classic).
For Yarn 2+ docs and migration guide, see yarnpkg.com.

Package detail

azure-event-hubs

azure540MITdeprecated0.2.11TypeScript support: included

This package has been deprecated. Please use @azure/event-hubs instead

Azure Event Hubs SDK for Node.js

readme

azure-event-hubs

Deprecation warning

This package has been deprecated. Please use @azure/event-hubs instead.

Please install:

npm i @azure/event-hubs

Azure Event Hubs is a scalable event processing service that ingests and processes large volumes of events and data, with low latency and high reliability. More information about Azure Event Hubs can be found over here.

This sdk provides a convenient way to interact with the Azure Event Hubs service.

Pre-requisite

  • Node.js version: 8.x or higher. We would encourage you to install the latest available LTS version at any given time from https://nodejs.org. Please do not use older LTS versions of node.js.

Installation

npm install azure-event-hubs

Client creation

The simplest usage is to use the static factory method EventHubClient.createFromConnectionString(_connection-string_, _event-hub-path_). Once you have a client, you can use it for:

Sending events

  • You can send a single event using client.send() method.
  • You can even batch multiple events together using client.sendBatch() method.

Receiving events

There are two ways to receive events using the EventHub Client.

Streaming receiver

The EventHubClient has a client.receive(. . .) method on the receiver. This message takes the messageHandler() and the errorHandler() amongst other parameters and registers them to the receiver. This method returns a ReceiverHandler that can be used to stop receiving further events await receiverHandler.stop() This mechanism can be useful in a scenario, where you want to continuously receive events/messages at a high speed.

Batching receiver

You can use await client.receiveBatch(...) to receive desired number of events for specified amount of time. It will return an array of EventData objects once it receives the desired number of events or the max wait time occurs (which ever happens first). This mechanism can be useful when you want to receive events/messages in a batch. If your in a scenario where you would like to receive some messages and process them (since message processing is time consuming), and later get some more messages, then this mechanism will suite your needs better.

Note: For scalable and efficient receiving, please take a look at azure-event-processor-host. The Event Processor host, internally uses the streaming receiver to receive messages.

IDE

This sdk has been developed in TypeScript and has good source code documentation. It is highly recommended to use vscode or any other IDE that provides better intellisense and exposes the full power of source code documentation.

Debug logs

You can set the following environment variable to get the debug logs.

  • Getting debug logs from the Event Hub SDK
    export DEBUG=azure*
  • Getting debug logs from the Event Hub SDK and the protocol level library.
    export DEBUG=azure*,rhea*
  • If you are not interested in viewing the message transformation (which consumes lot of console/disk space) then you can set the DEBUG environment variable as follows:
    export DEBUG=azure*,rhea*,-rhea:raw,-rhea:message,-azure:amqp-common:datatransformer
  • If you are interested only in errors, then you can set the DEBUG environment variable as follows:
    export DEBUG=azure:event-hubs:error,azure-amqp-common:error,rhea-promise:error,rhea:events,rhea:frames,rhea:io,rhea:flow

Logging to a file

  • Set the DEBUG environment variable as shown above and then run your test script as follows:
    • Logging statements from you test script go to out.log and logging statement from the sdk go to debug.log.
      node your-test-script.js > out.log 2>debug.log
    • Logging statements from your test script and the sdk go to the same file out.log by redirecting stderr to stdout (&1), and then redirect stdout to a file:
      node your-test-script.js >out.log 2>&1
    • Logging statements from your test script and the sdk go to the same file out.log.
        node your-test-script.js &> out.log

Examples

Please take a look at the examples directory for detailed examples.

Example 1 - Get the partition IDs.

const { EventHubClient } = require('azure-event-hubs');

const client = EventHubClient.createFromConnectionString(process.env["EVENTHUB_CONNECTION_STRING"], process.env["EVENTHUB_NAME"]);

async function main() {
  const partitionIds = await client.getPartitionIds();
}

main().catch((err) => {
  console.log(err);
});

Example 2.1 - Receive events with handlers

This mechanism is useful for receiving events for a longer duration.

Receive events from partition ID 1 after the current time.

const { EventHubClient, EventPosition } = require('azure-event-hubs');

const client = EventHubClient.createFromConnectionString(process.env["EVENTHUB_CONNECTION_STRING"], process.env["EVENTHUB_NAME"]);

async function main() {
  const onError = (err) => {
    console.log("An error occurred on the receiver ", err);
  };

  const onMessage = (eventData) => {
    console.log(eventData.body);
    const enqueuedTime = eventData.annotations["x-opt-enqueued-time"];
    console.log("Enqueued Time: ", enqueuedTime);
  };

  const receiveHandler = client.receive("1", onMessage, onError, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });

  // To stop receiving events later on...
  await receiveHandler.stop();
}

main().catch((err) => {
  console.log(err);
});

Example 2.2 - Receive specified number of events for a given time

This mechanism is useful when you want to see how the received events look like. It can also be useful for debugging purpose.

Receive events from partitionId "1" after the current time.

const { EventHubClient, EventPosition } = require('azure-event-hubs');

const client = EventHubClient.createFromConnectionString(process.env["EVENTHUB_CONNECTION_STRING"], process.env["EVENTHUB_NAME"]);

async function main() {
  const datas = await client.receiveBatch("1", 100 /*number of events*/, 20 /*amount of time in seconds the receiver should run. Default 60 seconds.*/, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
  console.log("Array of EventData objects", datas);
}

main().catch((err) => {
  console.log(err);
});

Example 3 - Send an event with partition key.

Send an event with a given "partition-key" which is then hashed to a partition ID (so all events with the same key will go to the same ID, but load is balanced between partitions).

const { EventHubClient, EventPosition } = require('azure-event-hubs');

const client = EventHubClient.createFromConnectionString(process.env["EVENTHUB_CONNECTION_STRING"], process.env["EVENTHUB_NAME"]);

async function main() {
  // NOTE: For receiving events from Azure Stream Analytics, please send Events to an EventHub where the body is a JSON object.
  // const eventData = { body: { "message": "Hello World" }, partitionKey: "pk12345"};
  const eventData = { body: "Hello World", partitionKey: "pk12345"};
  const delivery = await client.send(eventData);
  console.log("message sent successfully.");
}

main().catch((err) => {
  console.log(err);
});

Example 4 - Send an event to a specific partition id.

Send an event to a specific partition ID if needed. If not specified then EventHub will store the events in the partition in a round-robin pattern.

const { EventHubClient, EventPosition } = require('azure-event-hubs');

const client = EventHubClient.createFromConnectionString(process.env["EVENTHUB_CONNECTION_STRING"], process.env["EVENTHUB_NAME"]);

async function main() {
  // NOTE: For receiving events from Azure Stream Analytics, please send Events to an EventHub where the body is a JSON object/array.
  // const eventData = { body: { "message": "Hello World" } };
  const data = { body: "Hello World 1", message_id: "343-0909-5454-23423-54543" };
  const delivery = await client.send(data, "1");
  console.log("message sent successfully.");
}

main().catch((err) => {
  console.log(err);
});

Example 5 - Send multiple events as a batch.

Send multiple events grouped together.

const { EventHubClient, EventPosition } = require('azure-event-hubs');

const client = EventHubClient.createFromConnectionString(process.env["EVENTHUB_CONNECTION_STRING"], process.env["EVENTHUB_NAME"]);

async function main() {
  const datas = [
    { body: "Hello World 1", applicationProperties: { id: "Some id" }, partitionKey: "pk786" },
    { body: "Hello World 2" },
    { body: "Hello World 3" }
  ];
  // NOTE: For receiving events from Azure Stream Analytics, please send Events to an EventHub
  // where the body is a JSON object/array.
  // const datas = [
  //   { body: { "message": "Hello World 1" }, applicationProperties: { id: "Some id" }, partitionKey: "pk786" },
  //   { body: { "message": "Hello World 2" } },
  //   { body: { "message": "Hello World 3" } }
  // ];
  const delivery = await client.sendBatch(datas);
  console.log("message sent successfully.");
}

main().catch((err) => {
  console.log(err);
});

Example 6 - Create an EventHubClient from an IotHub connection string.

Create EventHub Client from an IotHub Connection string. This is useful for receiving telemetry data of IotHub from the linked EventHub. Most likely the associated connection string will not have send claims. Hence getting HubRuntimeInfo or PartitionRuntimeInfo and receiving messages would be the possible operations.

  • Please notice that we are awaiting on the createFromIotHubConnectionString() method to get an instance of the EventHubClient. This is different from other static methods on the client. The method talks to the IotHub endpoint to get a redirect error which contains the EventHub endpoint to talk to. It then constructs the right EventHub connection string based on the information in the redirect error and returns an instance of the EventHubClient that you can play with. `js const { EventHubClient } = require('azure-event-hubs');

async function main() { const client = await EventHubClient.createFromIotHubConnectionString(process.env["IOTHUB_CONNECTION_STRING"]); const hubInfo = await client.getHubRuntimeInformation(); console.log(hubInfo); await client.close(); }

main().catch((err) => { console.log(err); }); `

AMQP Dependencies

It depends on rhea library for managing connections, sending and receiving events over the AMQP protocol.

changelog

2018-09-11 0.2.10

  • Added support to provide custom user-agent string that will be appended to the default user agent string.

2018-09-11 0.2.9

  • Updated examples and content in README.md

2018-08-31 0.2.8

  • Fixed issue
    • Added error handlers to the $management sender/receiver links.
    • Added error handlers to the amqp session of the $management and $cbs sender/receiver links.
  • Exported AadTokenProvider and SasTokenProvider from the client.

2018-08-29 0.2.7

  • Improved logging statements to the connection context.
  • Added timeout to promisifed creation/closing of rhea sender, receiver, session, connection.
  • Fixed a bug in the EventData deserialization logic by checking for != undefined check rather than the ! check.
  • While handling disconnects we retry for 150 times at an interval of 15 seconds as long the error is retryable.

2018-08-07 0.2.6

  • Improved log statements.
  • Documented different mechanisms of getting the debug logs in README.
  • Minimum dependency on "rhea": "^0.2.18".
  • Fixed bugs in recovery logic
  • Added support to recover from session close for sender and receiver
  • Added a new property isConnecting that provides information whether a linkEntity is currently in the process of establishing itself.
  • Using is_closed() method of sender, receiver and session in rhea to determine whether the sdk initiated the close.
  • MessagingError is retryable by default.
  • Added support to translate node.js SystemError into AmqpError.
  • Added a new static method createFromTokenProvider() on the EventHubClient where customers can provide their own TokenProvider.

2018-07-17 0.2.5

  • Improved log statements
  • Updated README.md
  • Updated dependency rhea to "^0.2.16" instead of github dependency.

2018-07-16 0.2.4

  • Added support to handle disconnects and link timeout errors.
  • Fixed client examples link in README.
  • Updated issue templates
  • Improvised the example structure
  • Moved the common stuff to amqp-common and added Connection, Session, Sender, Receiver objects to rhea-promise.
  • Improved tsconfig.json and tslint.json config files.
  • Added import "mocha" to all the test files, inorder to get rid of red squiggles in vscode.
  • Replaced crypto with jssha which is browser compatible

2018-06-13 0.2.3

  • Minor doc fixes and sample updates.
  • Add a listener for the disconnected event after opening the connection.

2018-05-23 0.2.2

  • Fixed the partitionkey issue while sending events. #73.
  • Bumped the minimum dependency on rhea to 0.2.13. This gives us type definitions for rhea.
  • rpc.open() returns the connection object. This makes it easy to extract common functionality to a separate library.

2018-05-09 0.2.1

  • Added support to create EventHubClient from an IotHub connectionstring. The following can be done
    const client = await EventHubClient.createFromIotHubConnectionString(process.env.IOTHUB_CONNECTION_STRING);
  • Internal design changes:
    • ManagementClient also does cbs auth before making the management request.
    • EventHubSender, EventHubReceiver, ManagementClient inherit from a base class ClientEntity.
    • Moved opening the connection to CbSClient as that is the first thing that should happen after opening the connection. This reduces calls to rpc.open() all over the sdk and puts them at one place in the init() method on the CbsClient.

2018-05-02 0.2.0

  • Added functionality to encode/decode the messages sent and received.
  • Created an options object in the client.createFromConnectionString() and the EventHubClient constructor. This is a breaking change. However moving to an options object design reduces the chances of breaking changes in the future. This options object will:
    • have the existing optional tokenProvider property
    • and a new an optional property named dataTransformer. You can provide your own transformer. If not provided then we will use the DefaultDataTransformer. This should be applicable for majority of the scenarios and will ensure that messages are interoperable between different Azure services. It fixes issue #60.

2018-04-26 0.1.2

  • Added missing dependency for uuid package and nit fixes in the README.md

2018-04-24 0.1.1

  • Changing client.receiveOnMessage() to client.receive() as that is a better naming convention and is in sync with other language sdks.

2018-04-23 0.1.0

  • Previously we were depending on amqp10 package for the amqp protocol. Moving forward we will be depending on rhea.
  • The public facing API of this library has major breaking changes from the previous version 0.0.8. Please take a look at the Readme and the examples directory for detailed samples.
  • Removed the need to say client.open.then(). First call to create a sender, receiver or get metadata about the hub or partition will establish the AMQP connection.
  • Added support to authenticate via Service Principal credentials, MSITokenCredentials, DeviceTokenCredentials.
    • This should make it easy for customers to login once using the above mentioned credentials,
      • Create the EventHubs infrastructure on the Azure management/control plane programmatically using (azure-arm-eventhubs) package over HTTPS prtocol.
      • Use the same credentials to send and receive messages to the EventHub using this library over AMQP protocol.
  • Provided a promise based API to create senders/receivers off the EventHubClient.
  • Added capability to send multiple messages by batching them together.
  • Added capability to receive predefined number of messages for a specified amount of time. Note that this method will receive all the messages and return an array of EventData objects.
  • Added capability to create an epoch receiver.
  • Simplified the mechanism to specify the EventPosition from which to receive messages from the EventHub.
  • Added proper TypeScript type definitions to the library that improves the intellisense experience for our customers.

2017-05-18 0.0.8

  • Fixed a race condition within the AMQP redirection code when using an IoT Hub connection string.
  • Disabled auto-retry of AMQP connections in amqp10 since the current client is not built to handle them and fails when retrying.

2017-03-31 0.0.7

  • Pulled changes for #14 and #20/#21.
  • Special thanks to @kurtb and @ali92hm for their contributions!

2017-01-13 0.0.6

  • Added support for message properties in the EventData structure.