Important: This documentation covers Yarn 1 (Classic).
For Yarn 2+ docs and migration guide, see yarnpkg.com.

Package detail

amqp10

noodlefrenzy11.2kMIT3.6.0

Native AMQP-1.0 client for node.js

amqp, amqp 1.0, amqp-1-0, amqp10, messaging, queues

readme

amqp10

travis npm coverage npm gitter

amqp10 is a promise-based, AMQP 1.0 compliant node.js client

Contents

Usage

The basic usage is to require the module, new up a client with the appropriate policy for the server you're connecting against, connect, and then send/receive as necessary. So a simple example for a local Apache Qpid server would look like:

var AMQPClient = require('amqp10').Client,
    Promise = require('bluebird');

var client = new AMQPClient(); // Uses PolicyBase default policy
client.connect('amqp://localhost')
  .then(function() {
    return Promise.all([
      client.createReceiver('amq.topic'),
      client.createSender('amq.topic')
    ]);
  })
  .spread(function(receiver, sender) {
    receiver.on('errorReceived', function(err) { // check for errors });
    receiver.on('message', function(message) {
      console.log('Rx message: ', message.body);
    });

    return sender.send({ key: "Value" });
  })
  .error(function(err) {
    console.log("error: ", err);
  });

By default send promises are resolved when a disposition frame is received from the remote link for the sent message, at this point the message is considered "settled". To tune this behavior, you can tweak the policy you give to AMQPClient on construction. For instance, to force send promises to be resolved immediately on successful sending of the payload, you would build AMQPClient like so:

var AMQPClient = require('amqp10').Client,
    Policy = require('amqp10').Policy;
var client = new AMQPClient(Policy.merge({
  senderLinkPolicy: {
    callbackPolicy: Policy.Utils.SenderCallbackPolicies.OnSent
  }
}, Policy.DefaultPolicy));

In addition to the above, you can also tune how message link credit is doled out (for throttling), as well as most other AMQP behaviors, all through policy overrides. See DefaultPolicy and the policy utilities for more details on altering various behaviors.

Flow Control and Message Dispositions

Flow control in AMQP occurs at both the Session and Link layers. Using our default policy, we start out with some sensible Session windows and Link credits, and renew those every time they get to the half-way point. In addition, receiver links start in "auto-settle" mode, which means that the sender side can consider the message "settled" as soon as it's sent. However, all of those settings are easily tune-able through Policy overrides (Policy.merge(<overrides>, <base policy>)).

For instance. we've provided a convenience helper for throttling your receiver links to only renew credits on messages they've "settled". To use this with Azure ServiceBus Queues for instance, it would look like:

var AMQPClient = require('amqp10').Client,
    Policy = require('amqp10').Policy;
var client = new AMQPClient(Policy.Utils.RenewOnSettle(1, 1, Policy.ServiceBusQueue));

Where the first number is the initial credit, and the second is the threshold - once remaining credit goes below that, we will give out more credit by the number of messages we've settled. In this case we're setting up the client for one-by-one message processing. Behind the scenes, this does the following:

  1. Sets the Link's creditQuantum to the first number (1), which you can do for yourself via the Policy mix-in { receiverLink: { creditQuantum: 1 } }

  2. Sets the Link to not auto-settle messages at the sender, which you can do for yourself via { receiverLink: { attach: { receiverSettleMode: 1 } } } Where did that magic "1" come from? Well, that's the value from the spec, but you could use the constant we've defined at require('amqp10').Constants.receiverSettleMode.settleOnDisposition

  3. Sets the Link's credit renewal policy to a custom method that renews only when the link credit is below the threshold and we've settled some messages. You can do this yourself by using your own custom method:

    {
    receiverLink: {
     credit: function (link, options) {
       // If the receiver link was just connected, set the initial link credit to the quantum. Otherwise, give more credit for every message we've settled.
       var creditQuantum = (!!options && options.initial) ? link.policy.creditQuantum : link.settledMessagesSinceLastCredit;
       if (creditQuantum > 0 && link.linkCredit < threshold) {
         link.addCredits(creditQuantum);
       }
     }
    }
    }

Note that once you've set the policy to not auto-settle messages, you'll need to settle them yourself. We've tried to make that easy by providing methods on the receiver link for each of the possible "disposition states" that AMQP allows:

  • link.accept(message) will tell the sender that you've accepted and processed the message.
  • link.reject(message, [error]) will reject the message with the given error (if provided). The sender is free to re-deliver, so this can be used to indicate transient errors.
  • link.modify(message, [options]) will tell the sender to modify the message and re-deliver. You can tell it you can't accept the message by using link.modify(message, { undeliverableHere: true })
  • link.release(message) will tell the sender that you haven't processed the message and it's free to re-deliver - even back to you.

All of these methods accept an array of messages, allowing you to settle many at once.

Plugins

The amqp10 module now supports pluggable Client behaviors with the exported use method. Officially supported plugins include:

  • amqp10-link-cache - caches links with optional purging based on ttl
  • amqp10-rpc - an rpc server/client implementation on top of amqp10

Supported Servers

We are currently actively running integration tests against the following servers:

  1. Azure EventHubs
  2. Azure ServiceBus Queues and Topics
  3. Apache Qpid C++ broker (qpidd)

We have been tested against the following servers, but not exhaustively so issues may remain:

  1. ActiveMQ (open issue related to ActiveMQ ignoring the auto-settle setting and disposition frames may cause messages to re-deliver or stop sending after a certain period)
  2. RabbitMQ with the amqp 1.0 experimental extension
  3. Apache Qpid Java broker

If you find any issues, please report them via GitHub.

Todos and Known Issues

  1. Disposition support is incomplete in that we don't send proper "unsettled" information when re-attaching links.
  2. There are some AMQP types we don't process - notably the Decimal23/64/128 types. These are unused by the protocol, and no-one seems to be using them to convey information in messages, so ignoring them is likely safe.

Implementation Notes

  • Using node's built-in net/tls classes for communicating with the server.

  • Data from the server is written to a buffer-list based on Rod Vagg's BL.

  • Outgoing data is encoded using this buffer builder - streaming output won't really work since each outgoing payload needs to be prefixed with its encoded size, however we're working on converting to use as much streaming as possible.

  • The connection state is managed using Stately.js, with state transitions swapping which callback gets invoked on receipt of new data. (e.g. post-connection, we write the AMQP version header and then install a callback to ensure the correct version. Once incoming data is written to the circular buffer, this callback is invoked, and a comparison vs. the expected version triggers another transition).

  • Debug output is done via debug with the prefix amqp10:. The main client's debug name is amqp10:client so setting DEBUG=amqp10:client as an environment variable will get you all top-level debugging output.

    bash# export DEBUG=amqp*
    C:\> set DEBUG=amqp*
    [root@pinguino]# node simple_eventhub_test.js
      amqp10:client connecting to: amqps://xxxxxx:xxxxxxxxx@xxxxxxxxxxxx.servicebus.windows.net +0ms
      amqp10:connection Connecting to xxxxxx-service-bus-001.servicebus.windows.net:5671 via TLS +72ms
      amqp10:connection Transitioning from DISCONNECTED to START due to connect +17ms
      amqp10:connection Sending Header 414d515003010000 +405ms
      amqp10:connection Transitioning from START to IN_SASL due to connected +6ms
      amqp10:connection Rx: 414d515003010000 +128ms
      amqp10:sasl Server SASL Version: 414d515003010000 vs 414d515003010000 +1ms
      amqp10:connection Rx: 0000003f02010000005340c03201e02f04b3000000074d535342434... +162ms
      amqp10:client Reading variable with prefix 0xc0 of length 52 +2ms
      amqp10:client Decoding 5340 +0ms
      [...]
  • Many thanks to Gordon Sim for inspiration on the type system, gleaned from his project rhea.

changelog

3.6.0 (2018-01-26)

Bug Fixes

  • address-parsing: don't truncate ':' passwords (fc84946), closes #318

Features

  • sasl: support custom SASL handlers (2842560)
  • session: add support for multiple sessions per client (8068045)

3.5.3 (2017-05-02)

Bug Fixes

  • policy: fix deprecated link options when using link overrides (25a8ded)
  • sender-link: resolve send promises for certain sndSettleMode (be48605)
  • service-bus-policy: leave default sndSettleMode as mixed (ed8578c)

3.5.2 (2017-04-15)

Bug Fixes

  • policy: reintroduce support for deprecated field names (eb1147d)

3.5.1 (2017-04-14)

Bug Fixes

  • link: properly name settle mode frame fields in policies (f3c4617)

3.5.0 (2017-03-07)

Features

  • encoder: introduce exported Encoder for easy type annotation (0d7e2c3)

3.4.2 (2017-02-27)

Bug Fixes

  • sender-link: use spec-defined calculation for linkCredit (373b736)

Features

  • sender-link: add support for sending with no reply (f0f267c)

3.4.1 (2017-02-07)

Bug Fixes

  • amqp_error: accept non-standard values for error-condition (9a560d2), closes #293
  • ServiceBusPolicy: Avoid jsonifying Buffers (0814658)

3.4.0 (2016-11-27)

Bug Fixes

  • use: throw an error if a plugin does not provide a function (e50566a)

Features

  • connection: terminate connection when frame steam interrupted (5455179)
  • docs: update documentation (#282) (39ed4ab)
  • sender-link: add support for sending with no reply (5e159a9)

3.3.2 (2016-10-27)

Bug Fixes

  • connection: don't require parameters for address on open (1f7f2a0)
  • policy: remove sealing to allow extension of policy methods (d0bdfc7)
  • sender-link: missing Object.keys around unsettled dispatch (b0580b9)
  • sender-link: suppplement missing error in rejected dispo (5c4f79a)
  • sender-link: throw an error if there is no active connection (e5bac49)

Features

  • policy: move parseLinkAddress into Policy (d1ae649), closes #267

3.3.1 (2016-10-20)

Bug Fixes

  • deepMerge: support copying Buffers (db6295f), closes #268
  • sasl: pass errors to callback, don't throw (c5b11ee)

3.3.0 (2016-10-18)

Bug Fixes

  • connection: use connectPolicy not this.policy.connect (3a1d4a3)
  • detach: don't always reject promise if !close (8c2e79e)
  • heartbeat: local vs remote idleTimeouts (8f20e2d)
  • MockAmqp: process batch socket data (51c16b3)
  • sasl: Make private saslMechanisms singular (1ef4e17)
  • sender-link: settle send promises on client disconnect (98bd734)

Features

  • ReceiverLink: emit transfer frame optionally on message (cd59fec)
  • sasl: Add support for SASL ANONYMOUS. (d067202), closes #250
  • types: allow DescribedType as message.body (b470e2b)

3.2.5 (2016-08-24)

Bug Fixes

  • defaultSubjects: throw error on invalid subjects (314b495)

3.2.4 (2016-08-19)

Bug Fixes

  • link: emit detached event when force detached locally (d2531ba)

3.2.3 (2016-08-11)

Bug Fixes

  • encoding: defaults for composite types should use type def (8e76ae5)

3.2.2 (2016-07-14)

Bug Fixes

  • session: force detach links in all possible active states (3a7272d)

3.2.1 (2016-07-08)

Bug Fixes

  • session: ensure deep copies are made from policies (2cb0797)

3.2.0 (2016-06-22)

Features

  • connected-disconnected: add signals for connection state (4f8d4ff)

3.1.6 (2016-06-16)

Bug Fixes

  • errors: typo in utilities.wrapProtocolError, errorInfo => info (91e229f)

Features

  • ssl: allow user to pass ssl options directly to transport (68c4a91)

3.1.5 (2016-05-31)

Bug Fixes

  • sender-link: raw message payloads should merge link options (8e48a3f)

3.1.4 (2016-04-04)

Features

  • dynamic-links: prepend dynamic links with dynamic_ (69960e0)

3.1.3 (2016-04-03)

Bug Fixes

  • wrapProtocolError: symbols are now plain strings (8d3fb8b)

3.1.2 (2016-04-01)

Bug Fixes

  • idleTimeout: incorrect variable name for idleTimeout in frame (8cb7e74)

3.1.1 (2016-03-27)

Features

  • use: allow users to plugin Client behaviors (8c7573f)

3.1.0 (2016-03-11)

Bug Fixes

  • address-parsing: support basic hostnames in address parsing (757f0d2)
  • application-properties: make properties accessible (ab8829a)
  • attach-frame: properly encode properties as AMQP fields type (b9eb18f)
  • eh-integration: refactor tests to fix after types refactor (1dc2bd1)
  • encoder: encode message body's with 0 as a value (9aa9f66)
  • frames: consume bytes before heartbeat short circuit (5f8b524)
  • frames: only consume buffer after successful frame type read (3e54a73)
  • link-reattach: ensure sessions don't reattach after disconnect (8eeeb5b)
  • longs: temporarily fix encoding longs in master (c9e4810)
  • multi-transfer: fix issue with sending multi transfer frames (6376e9d)
  • session: use console.warn instead of debug for warning message (e7c6367)
  • tools: update formatting tool to accomodate new debug style (b4c9567)
  • translator: ensure keys aren't objects for map types (442e24c)
  • types: fixed issues based on peer review (2b00efc)
  • types: temporary paths for incorporating new types as known (0b4a412)
  • ulong: decode smallulong and ulong0 as normal numbers (b2416a0)

Features

  • receiver-stream: add prototype for ReceiverStream (d52b8f4)
  • sender-stream: implement basic sender stream, add tests (648a4b7)

2.2.0 (2015-12-10)

Bug Fixes

  • long-ulong-decoding: if we can represent it in js then do so (04bfd67)

2.1.3 (2015-12-04)

Bug Fixes

  • ActiveMQPolicy: remove erroneous message about default idle (3914a63)
  • addCredits: don't return promise in ReceiverLink.addCredits (04246cf)
  • auto-reattach: temporarily disable blind auto reattaches (2e1aa25)
  • cached-receivers: resolve receiver links that already exist (427e48d)
  • canSend: move state check to canSend based on review (06e0edc)
  • client: actually use the idleTimeout provided by the broker (4e4b512)
  • client: clean up create[Sender,Receiver] preconditions, style (5809f68)
  • client: connect promise should resolve if reconnect is needed (faef9a7), closes #190
  • client: only listen for Session.Mapped once during connect (0cfd687)
  • client: remove deprecated _pendingSends (9ffe830)
  • client: remove deprecated _sendMsgId (bfd1910)
  • client: remove deprecated _unsettledSends (ed6c225)
  • Client: set connect options hostname to vhost if it exists (119bf49)
  • client-test: remove unneeded send parameter (43609fd)
  • codec: encode Modified state, throw error on invalid type (271415c), closes #176
  • codec: if a function is passed to encode then run it (ff05c74)
  • debug-output: remove debug output for generating timeouts (ef2c0f4)
  • DefaultPolicy: limit userPass split to 2 (2e4b00f)
  • deferredAttach: return reject, or resolve (64f2e8c)
  • deffered-attach: missed this references during promisification (e4f6023)
  • disposition: only send disposition to relevant links by role (f93f2a7)
  • disposition-test: pass proper link policy, fix deepMerge (b782d32)
  • disposition-test: use this instead of receiver (9b6a47e)
  • disposition-tests: remove superfluous address in send (cc51396)
  • DispositionFrame: use correct channel for disposition frame (70f9b4c)
  • frame-encodings: remove unused require statements (37f8ef2)
  • int64-coerce: node 4 doesn't allow us to pass Int64s to Buffer (347070c)
  • link: promisify detach, properly send closed flag in frame (158a221)
  • Link: properly call _detached on force detach to cleanup (79544c4)
  • link-addCredits: remove listener on resolve for addCredits (07fdacd)
  • link-attach: fix attachment messaging issues due to deps (f2b6d5e)
  • link-attach: fix attachment messaging issues due to deps (38d6d59)
  • link-attach-test: properly send back closed in DetachFrame (151d101)
  • link-creation: remove support for tracking duplicate links (ad602d2)
  • link-deatch: removeListener upon detach (05f1413)
  • link-debug: spelling mistake in debug message (ef6cd17)
  • link-options: changes based on review (d547ef4)
  • linting: add jshint for unused variables, delint codebase (48a9606)
  • list32: fix test for decoding list32 types (96b2b2b)
  • message-properties: add missing field and correct null values (f8252b1)
  • message-properties: added missing amqp symbols for two fields (d745d85)
  • message-properties: only access contents if symbol read (fa77d04)
  • message-properties: properly convert symbols from described (746f439)
  • MockClient: bug in removeAllListeners for node 4.x (04893e7)
  • MockServer: handle cases where incomplete data is received (45ecd2d)
  • MockServer: use proper address, actually send messages (505bf5e)
  • multiple-receivers: register callback when attaching (1a991b3)
  • parseLinkAddress: take policy into account when parsing (b8fd0f9)
  • reattach: check for existence of pendingSends before reattach (a9953e1)
  • reconnect: rejection should only happen when no reconnect left (47e3a98)
  • SaslInit: use coerce for mechanisms member (abb81bd)
  • SaslInitFrame: mechanism must be an amqp symbol (b8c7e84)
  • send-object: allow users to send an object as message body (b38ace9)
  • SenderLink: delete unsettled send on disposition (5453362)
  • SenderLink: temporarily remove transfersInFlight (d8738cc)
  • SenderLink: update multi-part messages according to comments (b3d5fb9)
  • SenderLink: use defaultSubject from policy, added test (3b676a2)
  • session: flow frames with no handle are not errors (505e803)
  • TransferFrame: accidentally forced type for delivery tag (91e9fd3)
  • travis: get code climate working again (976cb25)
  • typo: attachingListner => attachingListener (4781f1d)

Features

  • ActiveMQ: add ActiveMQ policy (5a718a3)
  • AMQPClient: provide access to message options on send/receive (75c4102)
  • auto-settle: send disposition frames for auto-settle (1a3473d)
  • benchmarks: add basic benchmarks for frameread and codec (53854f5)
  • benchmarks: add incredibly simple baseline benchmark tool (7dbee49)
  • char: add basic support for the AMQP char type (eb212f6)
  • Client: remove receipt callback on createReceiver (88e95b2)
  • connection-errors: add some connection-related errors (f465c3e)
  • constants: export Constants to the user (6c828b7)
  • createSender: add createSender to the Client class (abde6af)
  • default-subject: allow specifying subjects in link names (d12313d)
  • deliveryTag: track delivery tag in Session for shared links (62a73bd)
  • disposition: add accept and reject disposition methods (423952e)
  • disposition-test: add test for auto-settling messages (5e42990)
  • errors: export our errors for user consumption (cab5120)
  • exports: open up the possibility of exporting more classes (6700983)
  • failover: add exponential and fibonacci backoff failover (6d9dba6)
  • generateTimeouts: add utility for generating timeouts (47bdd54)
  • integration-config: allow override of test server (1cd72a8)
  • link: add _onAttach deferred attach handler (f442604)
  • Link: add Link.Attached signal (21fd06f)
  • link-addCredits: make addCredits public and promisified (4d1899a)
  • manual-disposition-test: add test case for manual disposition (c3d68e1)
  • MockServer: add refactored MockServer (3652c5c)
  • object-hash: add node-object-hash and add utility functions (a9d616a)
  • parseLinkAddress: add utility for parsing a link address (c50f497)
  • qpid-tests: add skeleton for qpid integration tests (a37748a)
  • qpid-tests: improve Client message options test coverage (9729b3f)
  • QpidJavaPolicy: add policy for qpid java broker (14c3913)
  • ReceiverLink: add Received and Modified disposition support (3bd2500)
  • SenderLink: accept a full message as the first param for send (010aebd)
  • SenderLink: support splitting messages into multiple frames (0135c3a)
  • unique-link-names: use uniquely generated link names (1b50b59)
  • utilities: add dispositionRange method (0f5963a)
  • uuid: add support for encoding/decoding uuid types (7363a17)
  • vhosts: add vhosts specified in address to sasl init frame (c95b741)

Performance Improvements

  • tests: dramatically speed up test runs (cbdece9)