Important: This documentation covers Yarn 1 (Classic).
For Yarn 2+ docs and migration guide, see yarnpkg.com.

Package detail

kafka-streams

nodefluent2.2kMIT5.0.0TypeScript support: included

kafka-streams for Node.js

kafka-streams, kafka, streams, streaming, topics, produce, consume, merge, join, map, flat, filter, reduce, flink, window, combine

readme

node-kafka-streams

Build Status npm version

// suggested Node.js version: v12.16.1
npm install --save kafka-streams
const {KafkaStreams} = require("kafka-streams");

const config = require("./config.json");
const factory = new KafkaStreams(config);

const kstream = factory.getKStream("input-topic");
const ktable = factory.getKTable(/* .. */);

kstream.merge(ktable).filter(/* .. */).map(/* .. */).reduce(/* .. */).to("output-topic");

CHANGES: The latest version brings a lot of changes, please check here before updating.

API Overview

You might also like

README Overview

Prerequisites

  • Kafka broker should be version >= 0.11.x
  • Node.js should be version >= 8.x.x

A note on native mode

If you are using the native mode (config: { noptions: {} }). You will have to manually install node-rdkafka alongside kafka-streams. (This requires a Node.js version between 9 and 12 and will not work with Node.js >= 13, last tested with 12.16.1)

On Mac OS High Sierra / Mojave: CPPFLAGS=-I/usr/local/opt/openssl/include LDFLAGS=-L/usr/local/opt/openssl/lib yarn add --frozen-lockfile node-rdkafka@2.7.4

Otherwise: yarn add --frozen-lockfile node-rdkafka@2.7.4

(Please also note: Doing this with npm does not work, it will remove your deps, npm i -g yarn)

Aim of this Library

  • this is not a 1:1 port of the official JAVA kafka-streams
  • the goal of this project is to give at least the same options to a nodejs developer that kafka-streams provides for JVM developers
  • stream-state processing, table representation, joins, aggregate etc. I am aiming for the easiest api access possible checkout the word count example

Description

kafka-streams :octopus: equivalent for nodejs :sparkles::turtle::rocket::sparkles: build on super fast :fire: observables using most.js :metal:

ships with sinek :pray: for backpressure

comes with js and native Kafka client, for more performance and SSL, SASL and Kerberos features

the lib also comes with a few window operations that are more similar to Apache Flink, yet they still feel natural in this api :squirrel:

overwriteable local-storage solution allows for any kind of datastore e.g. RocksDB, Redis, Postgres..

async (Promises) and sync stream operators e.g. stream$.map() or stream$.asyncMap()

super easy API :goberserk:

the lib is based on sinek, which is based on kafka-node's ConsumerGroups

Port Progress Overview

  • <input checked="" disabled="" type="checkbox"> core structure
  • <input checked="" disabled="" type="checkbox"> KStream base - stream as a changelog
  • <input checked="" disabled="" type="checkbox"> KTable base - stream as a database
  • <input checked="" disabled="" type="checkbox"> KStream & KTable cloning
  • <input checked="" disabled="" type="checkbox"> complex stream join structure
  • <input disabled="" type="checkbox"> advanced joins see
  • <input disabled="" type="checkbox"> windows (for joins) see
  • <input disabled="" type="checkbox"> flink like window operations
  • <input checked="" disabled="" type="checkbox"> word-count example
  • <input checked="" disabled="" type="checkbox"> more examples
  • <input checked="" disabled="" type="checkbox"> local-storage for etl actions
  • <input checked="" disabled="" type="checkbox"> local-storage factory (one per action)
  • <input disabled="" type="checkbox"> KStorage example for any DB that supports atomic actions
  • <input disabled="" type="checkbox"> backing-up local-storage via kafka
  • <input checked="" disabled="" type="checkbox"> kafka client implementation
  • <input checked="" disabled="" type="checkbox"> KTable replay to Kafka (produce)
  • <input checked="" disabled="" type="checkbox"> stream for topic message production only
  • <input checked="" disabled="" type="checkbox"> sinek implementation
  • <input checked="" disabled="" type="checkbox"> backpressure mode for KafkaClient
  • <input checked="" disabled="" type="checkbox"> auto-json payloads (read-map/write-map)
  • <input checked="" disabled="" type="checkbox"> auto producer partition and keyed-message handling
  • <input checked="" disabled="" type="checkbox"> documentation
  • <input checked="" disabled="" type="checkbox"> API description
  • <input disabled="" type="checkbox"> higher join & combine examples
  • <input checked="" disabled="" type="checkbox"> embed native client librdkafka for more performance
  • <input checked="" disabled="" type="checkbox"> SSL
  • <input checked="" disabled="" type="checkbox"> SASL
  • <input checked="" disabled="" type="checkbox"> Kerberos

Operator Implementations

  • <input checked="" disabled="" type="checkbox"> map
  • <input checked="" disabled="" type="checkbox"> asyncMap
  • <input checked="" disabled="" type="checkbox"> constant
  • <input checked="" disabled="" type="checkbox"> scan
  • <input checked="" disabled="" type="checkbox"> timestamp
  • <input checked="" disabled="" type="checkbox"> tap
  • <input checked="" disabled="" type="checkbox"> filter
  • <input checked="" disabled="" type="checkbox"> skipRepeats
  • <input checked="" disabled="" type="checkbox"> skipRepeatsWith
  • <input checked="" disabled="" type="checkbox"> slice
  • <input checked="" disabled="" type="checkbox"> take
  • <input checked="" disabled="" type="checkbox"> skip
  • <input checked="" disabled="" type="checkbox"> takeWhile
  • <input checked="" disabled="" type="checkbox"> skipWhile
  • <input checked="" disabled="" type="checkbox"> until
  • <input checked="" disabled="" type="checkbox"> since
  • <input checked="" disabled="" type="checkbox"> reduce
  • <input checked="" disabled="" type="checkbox"> chainReduce
  • <input checked="" disabled="" type="checkbox"> forEach (observe)
  • <input checked="" disabled="" type="checkbox"> chainForEach
  • <input checked="" disabled="" type="checkbox"> drain
  • <input checked="" disabled="" type="checkbox"> _zip
  • <input checked="" disabled="" type="checkbox"> _merge
  • <input checked="" disabled="" type="checkbox"> _join
  • <input checked="" disabled="" type="checkbox"> _combine
  • <input checked="" disabled="" type="checkbox"> _sample
  • <input checked="" disabled="" type="checkbox"> throttle
  • <input checked="" disabled="" type="checkbox"> debounce
  • <input checked="" disabled="" type="checkbox"> delay
  • <input checked="" disabled="" type="checkbox"> multicast
  • A description of the operators can be found here
  • Missing an operator? Feel free to open an issue :cop:

Additional Operators

  • <input checked="" disabled="" type="checkbox"> mapStringToArray
  • <input checked="" disabled="" type="checkbox"> mapArrayToKV
  • <input checked="" disabled="" type="checkbox"> mapStringToKV
  • <input checked="" disabled="" type="checkbox"> mapParse
  • <input checked="" disabled="" type="checkbox"> mapStringify
  • <input checked="" disabled="" type="checkbox"> atThroughput
  • <input checked="" disabled="" type="checkbox"> mapWrapKafkaPayload
  • <input checked="" disabled="" type="checkbox"> mapToFormat
  • <input checked="" disabled="" type="checkbox"> mapFromFormat
  • Want more? Feel free to open an issue :cop:

Stream Action Implementations

  • <input checked="" disabled="" type="checkbox"> countByKey
  • <input checked="" disabled="" type="checkbox"> sumByKey
  • <input checked="" disabled="" type="checkbox"> min
  • <input checked="" disabled="" type="checkbox"> max
  • Want more? Feel free to open an issue :cop:

Join Operations

Operation description

KStream Status

  • <input checked="" disabled="" type="checkbox"> merge
  • <input disabled="" type="checkbox"> outerJoin
  • <input checked="" disabled="" type="checkbox"> innerJoin
  • <input disabled="" type="checkbox"> leftJoin
  • <input checked="" disabled="" type="checkbox"> branch

KTable Status

  • <input checked="" disabled="" type="checkbox"> merge
  • <input disabled="" type="checkbox"> outerJoin
  • <input disabled="" type="checkbox"> innerJoin
  • <input disabled="" type="checkbox"> leftJoin

KTable <-> KStream Status

  • <input checked="" disabled="" type="checkbox"> merge
  • <input disabled="" type="checkbox"> outerJoin
  • <input disabled="" type="checkbox"> innerJoin
  • <input disabled="" type="checkbox"> leftJoin

Window Operations

KStream

  • <input checked="" disabled="" type="checkbox"> window
  • <input disabled="" type="checkbox"> advanced window
  • <input disabled="" type="checkbox"> rolling window

More

Can I use this library yet?

Yes.

Are we ready for production yet?

Probably, yes. :smile:

Even More

Forks or Stars give motivation :bowtie:

changelog

kafka-streams CHANGELOG

2020-02-24, Version 5.0.0

  • upgraded dependencies
  • BREAKING please note that the latest sinek (kafka-client) will not install the native kafka client dependency node-rdkafka anymore. In case you are using the native client with kafka-streams or kafka-connect you have to install it manually (see below).

A note on native mode

If you are using the native mode (config: { noptions: {} }). You will have to manually install node-rdkafka alongside kafka-streams. (This requires a Node.js version between 9 and 12 and will not work with Node.js >= 13, last tested with 12.16.1)

On Mac OS High Sierra / Mojave: CPPFLAGS=-I/usr/local/opt/openssl/include LDFLAGS=-L/usr/local/opt/openssl/lib yarn add --frozen-lockfile node-rdkafka@2.7.4

Otherwise: yarn add --frozen-lockfile node-rdkafka@2.7.4

(Please also note: Doing this with npm does not work, it will remove your deps, npm i -g yarn)

2019-07-04, Version 4.12.0

  • upgraded dependencies
  • fixed issue #91
  • added topic extractor example

2019-05-16, Version 4.11.0

  • added continueWith dsl operation

2019-05-13, Version 4.10.0

  • fixed typo that broke window functionality
  • correctly handling rejection of .to() promise call
  • correctly handling produce errors (make sure to sub kafkaStreams.on("error", (error) => {...});)
  • KStream.branch([...]) now returns stream clones that consist of deeply cloned events
  • KStream.clone(cloneObjects = false, cloneDeep = false) now offers optional parameters to clone the stream events, otherwise mutating the origin stream will alter the cloned stream's objects
  • hardened the StreamDSL JSON convenience methods
  • other small refactorings

2019-05-07, Version 4.9.0

  • upgraded dependencies
  • fixed bug where kstream merge was not passing kafkaStreams reference
  • fixed code formatting
  • fixed word-count example
  • fixed produceToTopic example
  • fixed consumeOneProduceTwo example
  • adjusted consumeFromTopic example
  • ported most-subject back to 5.3.0 to fix major window bug
  • fixed window example

2019-09-01, Version 4.8.0

  • fixed bug in type declarations
  • fixed typo in quick-start docu
  • fixed bug in window (createSubject; most-subject switched from async to create..)
  • upgraded dependencies (newest native client)

2018-09-10, Version 4.7.0

  • now referencing new kafka client type (advanced configs) declarations
  • upgrade dependencies (kafka clients)

2018-09-10, Version 4.6.5

  • again fix on type declaration for .to() sorry

2018-08-10, Version 4.6.4

  • removed ncu as dependency, should not have been added after all
  • alloing "auto" as partition count param for .to() call
  • upgraded sinek from 6.22.2 to 6.22.3

2018-21-09, Version 4.6.3

  • made topic optional in ts definition for .to() call

2018-20-09, Version 4.6.2

  • shipping fix for a bug in node-sinek

2018-11-09, Version 4.6.1

  • added return type for .to() call in ts definition

2018-10-09, Version 4.6.0

  • added KStream.branch() etl method
  • added typescript declarations
  • updated dependencies:

sinek → ~6.22.0 proxyquire → ~2.1.0 bluebird → ~3.5.2 uuid → ~3.3.2

2018-27-06, Version 4.5.0

  • updated depdendencies
  • new sinek version brings additional metadata methods

2018-15-06, Version 4.4.0

  • updated dependencies
  • new sinek version brings latest node-rdkafka and kafka-node as well as message value and key buffer encoding for javascript client

2018-31-05, Version 4.3.0

  • added concatMap dsl method
  • added getNewMostFrom methhod
  • added example consomeOneProduceTwo

2018-18-05, Version 4.2.0

  • added new DSL function most.awaitPromises

2018-23-04, Version 4.1.0

  • added ability to process a different Kafka configuration for producing via .to() if the first parameter is an object. Pass an object to .to({outputKafkaConfig: {}}) (in case you are not starting a consumer with your stream)
  • added ability to process a different Kafka configuration for producing via .start(). Pass an object to .start({outputKafkaConfig: {}}) (works for KStream and KTable) in case you are starting a consumer and producer on the same stream.

2018-23-04, Version 4.0.2

  • fixed typo in messageProduceHandler that caused errors during on demand message production
  • now passing version in messageProduceHandler

2018-23-04, Version 4.0.1

  • fixed empty produce topic from starting consumer even if not needed

2018-20-04, Version 4.0.0

ADDITIONAL FEATURES:

  • KafkaStreams inherits event emitter ("closed" event)
  • you can now pass multiple topics to the .getKStream() function
  • there is now a .from("topic"|["topics"]) dsl function to subscribe
  • mapJSONConvenience, mapStringValueToJSONObject, mapBufferValueToString, mapBufferKeyToString
  • wrapAsKafkaValue
  • you can now pass batchOptions for the native consumer to via config.batchOptions
  • passing batchOptions enforces backpressure mode
  • added setProduceHandler and createAndSetProduceHandler methods to register for message delivery

BREAKING CHANGES:

  • raw events are no streamed with key and value buffers
  • mapParse has been renamed to mapJSONParse
  • mapWrapKafkaPayload has been renamed to mapWrapKafkaValue
  • passing no config object to KafkaStreams will now throw
  • (internal) NativeKafka .send() does not handle array of messages anymore
  • (internal) NativeKafka produce method params have been altered slightly
  • stream events that run through .to will now be checked if they have a message schema, if they do, their fields (key, value, partition, ..) will be remapped to the produce config

Other:

  • dropped old crappy/flakey int tests
  • added new e2e and unit tests
  • all produce activies have been bundled in messageProduceHandle
  • folder structure has been refactored
  • updated documentation
  • added kafka message schemes documentation

2017-10-03, Version 3.0.0

  • Updated dependencies

  • Ships with new sinek version that brings a second kafka-client via librdkafka

  • split KafkaClient into JSKafkaClient and NativeKafkaClient based on sineks two clients
  • KafkaFactory (used by KafkaStreams) will switch between clients based on config parameters see docs/native.md
  • When using the native client: SSL, SASL and Kerberos are available see docs/ssl-sasl.md
  • With the help of the native client performance of the library is increased by a magnitude

  • BREAKING CHANGE kafkaStreams.closeAll() returns a promise and awaits the closing of all kafka clients as well as storages

  • the /kafka-setup has been updated with an SSL example

  • documentation has been updated

2017-08-12, Version 2.3.0

  • Minor fixes
  • Updated depdendencies

2017-07-12, Version 2.1.0

(sinek update) Kafka Client is now able to connect directly to the Kafka Broker

  • Updated all dependencies
  • Clients can now omit Zookeeper and connect directly to a Broker by omitting zkConStr and passing kafkaHost in the config

Producer/Consumer Key Changes #704

  • BREAKING CHANGE The key is decoded as a string by default. Previously was a Buffer. The preferred encoding for the key can be defined by the keyEncoding option on any of the consumers and will fallback to encoding if omitted

2017-07-11, Version 1.32.0

  • First entry in CHANGELOG.md