Giter Site home page Giter Site logo

lifion / lifion-kinesis Goto Github PK

View Code? Open in Web Editor NEW
86.0 14.0 18.0 4.14 MB

A native Node.js producer and consumer library for Amazon Kinesis Data Streams

License: MIT License

JavaScript 99.81% Shell 0.19%
adp amazon aws big-data client kinesis lifion kinesis-client kinesis-data-streams kinesis-stream

lifion-kinesis's Introduction

lifion-kinesis

npm version

Lifion's Node.js client for Amazon Kinesis Data Streams.

Getting Started

To install the module:

npm install lifion-kinesis --save

The main module export is a Kinesis class that instantiates as a readable stream.

const Kinesis = require('lifion-kinesis');

const kinesis = new Kinesis({
  streamName: 'sample-stream'
  /* other options from AWS.Kinesis */
});
kinesis.on('data', data => {
  console.log('Incoming data:', data);
});
kinesis.startConsumer();

To take advantage of back-pressure, the client can be piped to a writable stream:

const { promisify } = require('util');
const Kinesis = require('lifion-kinesis');
const stream = require('stream');

const asyncPipeline = promisify(stream.pipeline);
const kinesis = new Kinesis({
  streamName: 'sample-stream'
  /* other options from AWS.Kinesis */
});

asyncPipeline(
  kinesis,
  new stream.Writable({
    objectMode: true,
    write(data, encoding, callback) {
      console.log(data);
      callback();
    }
  })
).catch(console.error);
kinesis.startConsumer();

Features

  • Standard Node.js stream abstraction of Kinesis streams.
  • Node.js implementation of the new enhanced fan-out feature.
  • Optional auto-creation, encryption, and tagging of Kinesis streams.
  • Support for a polling mode, using the GetRecords API, with automatic checkpointing.
  • Support for multiple concurrent consumers through automatic assignment of shards.
  • Support for sending messages to streams, with auto-retries.

API Reference

Kinesis ⇐ PassThrough

A pass-through stream class specialization implementing a consumer of Kinesis Data Streams using the AWS SDK for JavaScript. Incoming data can be retrieved through either the data event or by piping the instance to other streams.

Kind: Exported class
Extends: PassThrough

new Kinesis(options)

Initializes a new instance of the Kinesis client.

Param Type Default Description
options Object The initialization options. In addition to the below options, it can also contain any of the AWS.Kinesis options.
[options.compression] string The kind of data compression to use with records. The currently available compression options are either "LZ-UTF8" or none.
[options.consumerGroup] string The name of the group of consumers in which shards will be distributed and checkpoints will be shared. If not provided, it defaults to the name of the application/project using this module.
[options.createStreamIfNeeded] boolean true Whether if the Kinesis stream should be automatically created if it doesn't exist upon connection
[options.dynamoDb] Object {} The initialization options for the DynamoDB client used to store the state of the consumers. In addition to tableNames and tags, it can also contain any of the AWS.DynamoDB options.
[options.dynamoDb.tableName] string The name of the table in which to store the state of consumers. If not provided, it defaults to "lifion-kinesis-state".
[options.dynamoDb.tags] Object If provided, the client will ensure that the DynamoDB table where the state is stored is tagged with these tags. If the table already has tags, they will be merged.
[options.encryption] Object The encryption options to enforce in the stream.
[options.encryption.type] string The encryption type to use.
[options.encryption.keyId] string The GUID for the customer-managed AWS KMS key to use for encryption. This value can be a globally unique identifier, a fully specified ARN to either an alias or a key, or an alias name prefixed by "alias/".
[options.initialPositionInStream] string "LATEST" The location in the shard from which the Consumer will start fetching records from when the application starts for the first time and there is no checkpoint for the shard. Set to LATEST to fetch new data only Set to TRIM_HORIZON to start from the oldest available data record.
[options.leaseAcquisitionInterval] number 20000 The interval in milliseconds for how often to attempt lease acquisitions.
[options.leaseAcquisitionRecoveryInterval] number 5000 The interval in milliseconds for how often to re-attempt lease acquisitions when an error is returned from aws.
[options.limit] number 10000 The limit of records per get records call (only applicable with useEnhancedFanOut is set to false)
[options.logger] Object An object with the warn, debug, and error functions that will be used for logging purposes. If not provided, logging will be omitted.
[options.maxEnhancedConsumers] number 5 An option to set the number of enhanced fan-out consumer ARNs that the module should initialize. Defaults to 5. Providing a number above the AWS limit (20) or below 1 will result in using the default.
[options.noRecordsPollDelay] number 1000 The delay in milliseconds before attempting to get more records when there were none in the previous attempt (only applicable when useEnhancedFanOut is set to false)
[options.pollDelay] number 250 When the usePausedPolling option is false, this option defines the delay in milliseconds in between poll requests for more records (only applicable when useEnhancedFanOut is set to false)
[options.s3] Object {} The initialization options for the S3 client used to store large items in buckets. In addition to bucketName and endpoint, it can also contain any of the AWS.S3 options.
[options.s3.bucketName] string The name of the bucket in which to store large messages. If not provided, it defaults to the name of the Kinesis stream.
[options.s3.largeItemThreshold] number 900 The size in KB above which an item should automatically be stored in s3.
[options.s3.nonS3Keys] Array.<string> [] If the useS3ForLargeItems option is set to true, the nonS3Keys option lists the keys that will be sent normally on the kinesis record.
[options.s3.tags] string If provided, the client will ensure that the S3 bucket is tagged with these tags. If the bucket already has tags, they will be merged.
[options.shardCount] number 1 The number of shards that the newly-created stream will use (if the createStreamIfNeeded option is set)
[options.shouldDeaggregate] string | boolean "auto" Whether the method retrieving the records should expect aggregated records and deaggregate them appropriately.
[options.shouldParseJson] string | boolean "auto" Whether if retrieved records' data should be parsed as JSON or not. Set to "auto" to only attempt parsing if data looks like JSON. Set to true to force data parse.
[options.statsInterval] number 30000 The interval in milliseconds for how often to emit the "stats" event. The event is only available while the consumer is running.
options.streamName string The name of the stream to consume data from (required)
[options.supressThroughputWarnings] boolean false Set to true to make the client log ProvisionedThroughputExceededException as debug rather than warning.
[options.tags] Object If provided, the client will ensure that the stream is tagged with these tags upon connection. If the stream is already tagged, the existing tags will be merged with the provided ones before updating them.
[options.useAutoCheckpoints] boolean true Set to true to make the client automatically store shard checkpoints using the sequence number of the most-recently received record. If set to false consumers can use the setCheckpoint() function to store any sequence number as the checkpoint for the shard.
[options.useAutoShardAssignment] boolean true Set to true to automatically assign the stream shards to the active consumers in the same group (so only one client reads from one shard at the same time). Set to false to make the client read from all shards.
[options.useEnhancedFanOut] boolean false Set to true to make the client use enhanced fan-out consumers to read from shards.
[options.usePausedPolling] boolean false Set to true to make the client not to poll for more records until the consumer calls continuePolling(). This option is useful when consumers want to make sure the records are fully processed before receiving more (only applicable when useEnhancedFanOut is set to false)
[options.useS3ForLargeItems] boolean false Whether to automatically use an S3 bucket to store large items or not.

kinesis.startConsumer() ⇒ Promise

Starts the stream consumer, by ensuring that the stream exists, that it's ready, and configured as requested. The internal managers that deal with heartbeats, state, and consumers will also be started.

Kind: instance method of Kinesis
Fulfil: undefined - Once the consumer has successfully started.
Reject: Error - On any unexpected error while trying to start.

kinesis.stopConsumer()

Stops the stream consumer. The internal managers will also be stopped.

Kind: instance method of Kinesis

kinesis.putRecord(params) ⇒ Promise

Writes a single data record into a stream.

Kind: instance method of Kinesis
Fulfil: Object - The de-serialized data returned from the request.
Reject: Error - On any unexpected error while writing to the stream.

Param Type Description
params Object The parameters.
params.data * The data to put into the record.
[params.explicitHashKey] string The hash value used to explicitly determine the shard the data record is assigned to by overriding the partition key hash.
[params.partitionKey] string Determines which shard in the stream the data record is assigned to. If omitted, it will be calculated based on a SHA-1 hash of the data.
[params.sequenceNumberForOrdering] string Set this to the sequence number obtained from the last put record operation to guarantee strictly increasing sequence numbers, for puts from the same client and to the same partition key. If omitted, records are coarsely ordered based on arrival time.
[params.streamName] string If provided, the record will be put into the specified stream instead of the stream name provided during the consumer instantiation.

kinesis.listShards(params) ⇒ Promise

List the shards of a stream.

Kind: instance method of Kinesis
Fulfil: Object - The de-serialized data returned from the request.
Reject: Error - On any unexpected error while writing to the stream.

Param Type Description
params Object The parameters.
[params.streamName] string If provided, the method will list the shards of the specific stream instead of the stream name provided during the consumer instantiation.

kinesis.putRecords(params) ⇒ Promise

Writes multiple data records into a stream in a single call.

Kind: instance method of Kinesis
Fulfil: Object - The de-serialized data returned from the request.
Reject: Error - On any unexpected error while writing to the stream.

Param Type Description
params Object The parameters.
params.records Array.<Object> The records associated with the request.
params.records[].data * The record data.
[params.records[].explicitHashKey] string The hash value used to explicitly determine the shard the data record is assigned to by overriding the partition key hash.
[params.records[].partitionKey] string Determines which shard in the stream the data record is assigned to. If omitted, it will be calculated based on a SHA-1 hash of the data.
[params.streamName] string If provided, the record will be put into the specified stream instead of the stream name provided during the consumer instantiation.

kinesis.getStats() ⇒ Object

Returns statistics for the instance of the client.

Kind: instance method of Kinesis
Returns: Object - An object with the statistics.

Kinesis.getStats() ⇒ Object

Returns the aggregated statistics of all the instances of the client.

Kind: static method of Kinesis
Returns: Object - An object with the statistics.

License

MIT

lifion-kinesis's People

Contributors

aruna770 avatar bizzlepop34 avatar dependabot[bot] avatar duinness avatar eaviles avatar guypesto avatar jennyeckstein avatar kenzable avatar pjlevitt avatar renovate-bot avatar snyk-bot avatar supernavix avatar zfu-lifion avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

lifion-kinesis's Issues

Events Lost When Consumers Scale from 1 to 2

I have been using this npm module with one consumer instance for a while now and have had no problems with events from AWS Kinesis being lost. However, since my application will be hosted in Kubernetes with horizontal autoscalers, I did some testing with varying consumer amounts to see how it performed. One thing I noticed was that Kinesis events were lost when I went from 1 consumer to 2. Both these consumers were using the same consumer group, but were separate instances of my consumer application. Let me explain.

I would start up one consumer and let it initialize correctly, we'll call this consumer consumer1. I have a data stream with a provisioned number of shards at 4 and the DynamoDB associated with this consumer was also initialized correctly. I would then use a producer program to ingest documents into the Kinesis data stream at a document rate of 1 doc/sec. These documents were labeled with IDs starting from 0 all the way to 300 so that I could keep track. I let consumer1 consume about 50 documents and then initialized a 2nd consumer (consumer2). After about 2 minutes after consumer2 initialized, consumer2 starts receiving documents as well. I let the producer run until the 300 documents have been successfully ingested into the Kinesis data stream and I let the two consumers run about 5 minutes after the 300th document was ingested to make sure they were done receiving documents.

After this trail run ends, I print out what document IDs the two consumers received and also what shard those documents came in through. I do the same for the producer to make sure that each ID was successfully inserted in the the Kinesis data stream and also to see which shard the producer placed the document into.

When I combine the two consumer lists, there are many document IDs missing. All the missing document IDs come after the ID of 50, meaning that as soon as I introduce the second consumer that's when documents go missing. When I look at the DynamoDB, two of the shards have consumer1 as the leaseOwner and the other two shards have consumer2 as the leaseOwner. So the leases were distributed correctly however, documents just failed to be properly received. I see no processRecord errors in my consumer applications indicating they failed to receive some of these documents. I tried changing the leaseAcquisitionInterval but that didn't help as I was still losing documents.

Any help on this would be greatly appreciated as this is one of the only npm modules we can use for consuming Kinesis documents in pure Nodejs.

Lease Acquisition Recovery Interval causing lots of AWS rate-limit errors

Currently the ACQUIRE_LEASES_RECOVERY_INTERVAL is hard-coded to fire every 5 seconds if an error is returned when attempting to acquire a lease. This becomes a big problem if an account has a lot of kinesis streams and multiple instances of lifion-kinesis managing them.

My prod account is safe, because it only has about 10 kinesis streams. But my development account covers 10 different testing environments, so it has about 100 streams. Searching my logs for "Unexpected recoverable failure when trying to acquire" returns ~5,000 hits per minute.

I've opened a PR to make the lease acquisition recovery interval configurable.
I ran this code in my test env and set it to 30 seconds. This dropped my error rate from 5,000/min to 20/min.

Version 1.2.2 useEnhancedFanOut causing error

const kinesis = new Kinesis({
    consumerName: 'consumerName',
    streamName: 'streamName',
    useEnhancedFanOut: true,
  });

Setting useEnhancedFanOut to true throws the following error.

UnhandledPromiseRejectionWarning: Error: `input` must not start with a slash when using `prefixUrl`
at normalizeArguments (/app/node_modules/got/dist/source/core/index.js:478:23)
at got (/app/node_modules/got/dist/source/create.js:112:39)
at Function.got.stream (/app/node_modules/got/dist/source/create.js:221:37)
at FanOutConsumer.start (/app/node_modules/lifion-kinesis/lib/fan-out-consumer.js:362:33)
at processTicksAndRejections (internal/process/task_queues.js:93:5)

Can't consume message

Hey guys,
First of all, this package is exactly what we were looking for...
Second, currently the message consuming is not working for me with localstack
Do you have any experience with it?

My localstack docker-compose.yml

version: '2.1'

services:
  localstack-container:
    container_name: "localstack-container"
    privileged: true
    image: localstack/localstack:0.11.0
    ports:
      - "4566-4599:4566-4599"
      - "8081:8081"
    # https://github.com/localstack/localstack#configurations
    environment:
      - SERVICES=kinesis,dynamodb
      - DEBUG=1
      - DATA_DIR=/tmp/localstack/data
      - KINESIS_STREAM_SHARDS=1
      - KINESIS_ERROR_PROBABILITY=0.0
      - KINESIS_STREAM_NAME=my-stream
      - DEFAULT_REGION=us-east-1
    volumes:
      - "./localstack/tmp/localstack:/tmp/localstack"
      - "./localstack/tmp/localstack/run/docker.sock:/var/run/docker.sock"

My code:

 this.consumer = new KinesisClient({
      streamName: 'my-stream',
      accessKeyId: 'test',
      secretAccessKey: 'test',
      endpoint: 'http://127.0.0.1:4568',
     region: 'us-east-1',
      dynamoDb: {
        region: 'us-east-1',
      },
    });

 this.consumer.on('data', (data: any) => {
      console.log('Incoming data:', data);
    });
await this.consumer.startConsumer();

But nothing happens(and not errors)...
Any idea why?

Exit always blocked after require()

Any time after

require('lifion-kinesis');

is called, the node script will not exit unless forced to. No other functions need to be called to see this behaviour. I've tried running wtfnode which will find open files, ports, timers, etc, etc and I get the following output:

[WTF Node?] open handles:
- File descriptors: (note: stdio always exists)
  - fd 1 (tty) (stdio)
  - fd 2 (tty) (stdio)
- Others:
  - MessagePort

MessagePort implies (based on my really limited knowledge) that maybe a worker thread is opened somewhere? At a glance I'm not seeing what could be causing this, or any library you're using that might be using MessagePort or worker threads.

Propagating errors....

First of all Awesome library. Kudos to you all!!
One thing I noticed while connecting to a Kinesis stream, I forgot to set the aws region in the config file
There were no errors from the library. Everything seemed to work fine but it would not listen to any events.
Can errors be propagated. something like this perhaps
kinesis.on('error', (err) => { log.error('Kinesis error', err); });

Node version requirement - 16.13.0

In v1.3.2 the node version requirement is as such ">=10.0.0 <16.10.0 || >16.13.0 <17.0.0 || >=17.0.1" which excludes 16.13.0, though this is the current lts (gallium). Can we get this updated to be >=16.13.0?

Thanks!

B Martin
Earnest

Node 16.10.0 causes promise pipeline to not accept array of streams

In the Node v16.10.0 update, passing in an array of streams as the parameter to a promisified streams.pipeline will cause an error. This has been brought up in nodejs/node#40663.

This issue will be fixed in the next Node v16 release. In the meantime, we can temporarily add a new node engine requirement such that it will warn when detecting node v16.10.0 ~ v17.0.0 inclusively.

Not able to call setCheckpoint()

When setting [useAutoCheckpoints] to false I can see that checkpoints stop being set, but I do not see where/how I should use setCheckpoint() to store the sequence number for a shard.
The setCheckpoint() function doesn't appear to be available outside of the lifion-kinesis library.

I'll start working on a PR for this, but I wanted to check and see if maybe there was something I was missing.

Thanks!

State store and/or DynamoDB client not stopped when stopConsumer() called

Hello, I am running into an issue where when I run my application in jest connecting to a local instance of kinesis (re: Kinesalite). However, the issue I am seeing is that it detects open handles even when stopConsumer() is called before the end of a test.

Jest did not exit one second after the test run has completed.

This usually means that there are asynchronous operations that weren't stopped in your tests. Consider running Jest with `--detectOpenHandles` to troubleshoot this issue.

I've isolated this just to when lifion-kinesis is set to consume, so I know the issue is when using this library.

My test is set up in this way ->
beforeAll(start kinesalite/dynalite) -> test(startConsumer -> stop Consumer) -> afterAll(stop kinesalite/dynalite)

When I run with https://github.com/Raynos/leaked-handles it checks what's still open and I get the following output -


no of handles 4


tcp stream {
  fd: 22,
  readable: false,
  writable: true,
  address: {},
  serverAddr: null
}

tcp stream {
  fd: 23,
  readable: false,
  writable: true,
  address: {},
  serverAddr: null
}

tcp stream {
  fd: 24,
  readable: true,
  writable: false,
  address: {},
  serverAddr: null
}

unknown handle <ref *1> Socket {
  _events: [Object: null prototype] { error: [Function: errorHandler] },
  _eventsCount: 1,
  _maxListeners: undefined,
  type: 'udp4',
  [Symbol(kCapture)]: false,
  [Symbol(asyncId)]: 722,
  [Symbol(state symbol)]: {
    handle: UDP {
      lookup: [Function: bound lookup4],
      [Symbol(owner)]: [Circular *1]
    },
    receiving: false,
    bindState: 0,
    connectState: 0,
    queue: undefined,
    reuseAddr: undefined,
    ipv6Only: undefined,
    recvBufferSize: undefined,
    sendBufferSize: undefined
  }
}

# ....after some more time ->

  console.error
    2020-05-27T13:32:43-0500 <warn> dynamodb-client.js:135 (Object.onRetry) Trying to recover from AWS.DynamoDB error…
        - Message: connect ECONNREFUSED 127.0.0.1:52005
        - Request ID: undefined
        - Code: NetworkingError (undefined)
        - Table: local-dynamodb-component-test

      at transport (../node_modules/tracer/lib/console.js:87:13)
      at ../node_modules/tracer/lib/console.js:72:3
          at Array.forEach (<anonymous>)
      at logMain (../node_modules/tracer/lib/console.js:71:19)
      at Object._self.<computed> [as warn] (../node_modules/tracer/lib/console.js:149:12)
      at Object.onRetry (../node_modules/lifion-kinesis/lib/dynamodb-client.js:135:16)
      at onError (../node_modules/async-retry/lib/index.js:33:17)

Writable Stream example does not work.

The example in the README does not work.

import Kinesis from 'lifion-kinesis';
import stream from 'stream';

stream.pipeline([
  new Kinesis({streamName:'streamName'}),
  new Writable({
    objectMode: true,
    write(data, encoding, callback) {
      console.log(data);
      callback();
    }
  })
]);

Running the code as-is returns this error Callback must be a function. Received Writable

I have updated the code to promisify the pipeline function, which resolves the error, but no data is read from the stream.

import Kinesis from 'lifion-kinesis';
import stream from 'stream';
import util from 'util';

const pipeline = util.promisify(stream.pipeline);
pipeline([
  new Kinesis({streamName:'streamName'}),
  new Writable({
    objectMode: true,
    write(data, encoding, callback) {
      console.log(data);
      callback();
    }
  })
]);

Need to be able to adjust ACQUIRE_LEASES_INTERVAL

Related to #312, even with doubling the rate limit to 20/second by switching to describeStreamSummary (which helped tremendously) we are still occasionally hitting the aws rate limit for ReadProvisionedThroughputExceeded in our development aws account.

If we were able to adjust how often the lease acquisition attempts were made we could give ourselves extra headroom as we scale up our use even further.

useAutoCheckpoints and enhanced fan-out consumers

Is manual checkpointing not supported for enhanced fan-out consumers? It looks like the consumer manager only passes the useAutoCheckpoints flag to the traditional consumers, and the enhanced fan-out consumer always checkpoints when the records include the continuation sequence number.

Is this intended/necessary--or just unimplemented, and potentially something someone could contribute?

DescribeStream rate limit

Hello!
We started using this library with a base case in our system and since it worked so well we scaled it out with multiple consumers. However, we started seeing "LimitExceededExceptions" being returned from AWS and after investigating, we were hitting the DescribeStream request rate limit of 10 qps per account. Looking into the code we see that it's being called by checkIfStreamExists here every 20 seconds.

This seems to put a hard cap on the number of consumers we can spin up with lifion. Has anyone else reported running into this rate limit before? Are there ways to get this stream data other than from DescribeStream?

Thank you!
James

ConditionalCheckFailedException

The library seems to be working just fine, but the stats indicate that DynamoDB is running into exceptions (about 1 every ~10 seconds). The exception is non-specific and the logs didn't show any errors either.

        {
          message: 'The conditional request failed',
          timestamp: 2019-12-18T20:20:14.937Z,
          code: 'ConditionalCheckFailedException',
          requestId: '5VT6DOA0TTOSJ1NC7T5PQ27HDBVV4KQNSO5AEMVJF66Q9ASUAAJG',
          statusCode: 400
        }

Are these exceptions expected?

Not able to call continuePolling

Hi

We've been using your library and it works awesome. But not able to use [options.usePausedPolling].

I've set usePausedPolling to be true, but I am not able to call continuePolling() function on lifion-kinesis object. Can you please check it?

Thanks in Advance :)

Support for a polling mode, using the GetRecords API

First of all thanks for implementing this library, this looks like a much better option for using Kinesis in Node than the AWS KCL library! The stream-based design is a nice way to expose this to NodeJS clients.

I'm currently investigating for which of our use cases we want to use the Enhanced fan-out, and for which ones we'd like to stick to the v1 API. That of course depends on factors like latency requirements, amount of data send and costs. One of the other drawbacks for the v2 API right now is that Kinesalite doesn't support running this locally yet, which makes it a lot harder to test and develop with the newer APIs. That could be a reason to still use v1 API for some use cases.

The README mentions that support for polling mode using GetRecords API is an Incoming feature, is there currently any work done on that? I can of course also put some time in to help out on this, and try to implement that feature. I'm curious if there are some initial ideas already on how to implement this.

storeShardCheckpoint not catching ConditionalCheckFailedException

We are seeing logs like this every 10 seconds or so:

ConditionalCheckFailedException: The conditional request failed
at Request.extractError (/app/node_modules/lifion-kinesis/node_modules/aws-sdk/lib/protocol/json.js:51:27)
at Request.callListeners (/app/node_modules/lifion-kinesis/node_modules/aws-sdk/lib/sequential_executor.js:106:20)
at Request.emit (/app/node_modules/lifion-kinesis/node_modules/aws-sdk/lib/sequential_executor.js:78:10)
at Request.emit (/app/node_modules/lifion-kinesis/node_modules/aws-sdk/lib/request.js:683:14)
at Request.transition (/app/node_modules/lifion-kinesis/node_modules/aws-sdk/lib/request.js:22:10)
at AcceptorStateMachine.runTo (/app/node_modules/lifion-kinesis/node_modules/aws-sdk/lib/state_machine.js:14:12)
at /app/node_modules/lifion-kinesis/node_modules/aws-sdk/lib/state_machine.js:26:10
at Request.<anonymous> (/app/node_modules/lifion-kinesis/node_modules/aws-sdk/lib/request.js:38:9)
at Request.<anonymous> (/app/node_modules/lifion-kinesis/node_modules/aws-sdk/lib/request.js:685:12)
at Request.callListeners (/app/node_modules/lifion-kinesis/node_modules/aws-sdk/lib/sequential_executor.js:116:18)

I noticed that the call to storeShardCheckpoint is not protected by the same try/catch as the other calls, which swallows ConditionalCheckFailedException.

Note that we are running with useEnhancedFanOut=true.

Could this explain what we are seeing?

Buffer issue running sample code

This library looks awesome, was trying to use it using the example but ran into some issues when actually receiving data. See the output below, it's successfully connecting and fetching data, but an event with actual data comes it it fails.

Checking if the stream "loadtest_notifications_consumer" exists…
The stream status is ACTIVE.
Checking if the "consumer-name" consumer for "loadtest_notifications_consumer" exists…
The stream consumer exists already and is active.
Retrieving the shards for the stream "loadtest_notifications_consumer"…
Creating subscribers for the stream shards using "consumer-name"…
Starting a "consumer-name" subscriber for shard "shardId-000000000000"…
Starting position: LATEST
Starting a "consumer-name" subscriber for shard "shardId-000000000001"…
Starting position: LATEST
The client is now connected.
Subscription to shard is successful.
Event "initial-response" emitted.
Subscription to shard is successful.
Event "initial-response" emitted.
Incoming data: { continuationSequenceNumber: '49593840296553928185147946290399650652941424392134459410',
  millisBehindLatest: 0,
  records: [],
  shardId: 'shardId-000000000001' }
Pipeline closed with error: Error: Expected 22315 bytes in the buffer but got 16341.
    at parse (/node_modules/lifion-aws-event-stream/lib/index.js:54:11)
    at Parser._transform (/node_modules/lifion-aws-event-stream/lib/index.js:203:17)
    at Parser.Transform._read (_stream_transform.js:190:10)
    at Parser.Transform._write (_stream_transform.js:178:12)
    at doWrite (_stream_writable.js:410:12)
    at writeOrBuffer (_stream_writable.js:394:5)
    at Parser.Writable.write (_stream_writable.js:294:11)
    at Transform.ondata (_stream_readable.js:689:20)
    at Transform.emit (events.js:189:13)
    at Transform.EventEmitter.emit (domain.js:441:20)

Here's the code I am running:

const Kinesis = require('lifion-kinesis');
const config = require('../config');
const streamName = 'loadtest_notifications_consumer';
const consumerName = 'consumer-name';
const client = new Kinesis({
    streamName,
    consumerName,
    region: config.AWS_DEFAULT_REGION,
    logger: console
});
client.on('data', (chunk) => {
    console.log('Incoming data:', chunk);
});
client.on('error', (error) => {
    console.log('Error:', error);
});

client.connect();

Thanks for any help!

Expose retryOptions in KinesisClient

As far as I can see retryOptions are hardcoded in the lib. I have a use case where I would like to lower or even set the number of retries to 0. It would be useful to me if retry options were a part of the client config.

Is that something you think would be a good idea or did you not expose it intentionally?
If you agree that it would be an improvement, I can open a PR for it.

[QUESTION] data not received

I'm new to kinesis so excuse my simple question, but I dont get how the credentials are provided in your sample code

I have setup a kinesis producer using the amazon-kinesis-client-nodejs sample_producer.

my credentials are provided via AWS.config.loadFromPath, it seems to working fine as my stream as ACTIVE status

I am able to create an instance of your Kinesis class and connect to the stream , but the on('data') callback is never called, I m using setInterval to send message inside the stream, but nothing is received, and I don't get any error message.

I don't know how to debug this, and there is no errors message, I dont know i'm connected to the correct stream or not.

Supporting De-aggregation of Records

I am using lifion-kinesis for consuming records which have been produced by FlinkKinesisProducer (which itself uses KPL) into a kinesis stream.

By default the KPL is configured to aggregate messages before flushing (based on config property "AggregationEnabled" being set to true by default), and it seems like lifion-kinesis does not support de-aggregation of these messages.

As an example I am producing 4 simple string messages from my Flink application - each with the same PartitionKey of "WEBSOCKET":

{"id": "c2de27bd", "msg": "DISCONNECTED_ACK"}
{"id": "8003c0e2", "msg": "CONNECTED_ACK"}
{"id": "8003c0e2", "msg": "AUTHENTICATED_ACK"}
{"id": "8003c0e2", "msg": "SUBSCRIBED_ACK"}

And on the receiving side (lifion-kinesis) I receive the following:

{"id": "c2de27bd", "msg": "DISCONNECTED_ACK"}
{"id": "8003c0e2", "msg": "CONNECTED_ACK"}
��
        WEBSOCKET.{"id": "8003c0e2", "msg": "AUTHENTICATED_ACK"}+{"id": "8003c0e2", "msg": "SUBSCRIBED_ACK"}DUJ����R%Ѽ�_B�

(Yes the last one is garbled - obviously printing non-printables)

So I made use of the AWS CLI and pulled the records directly and I get the following:

{
    "Records": [
        {
            "SequenceNumber": "49606579326406038367464048750237457368417136686533705730",
            "ApproximateArrivalTimestamp": 1588670713.454,
            "Data": "eyJpZCI6ICJjMmRlMjdiZCIsICJtc2ciOiAiRElTQ09OTkVDVEVEX0FDSyJ9",
            "PartitionKey": "WEBSOCKET"
        },
        {
            "SequenceNumber": "49606579326406038367464048751263835389269956924578725890",
            "ApproximateArrivalTimestamp": 1588670714.062,
            "Data": "eyJpZCI6ICI4MDAzYzBlMiIsICJtc2ciOiAiQ09OTkVDVEVEX0FDSyJ9",
            "PartitionKey": "WEBSOCKET"
        },
        {
            "SequenceNumber": "49606579326406038367464048751789718120802320615575912450",
            "ApproximateArrivalTimestamp": 1588670714.376,
            "Data": "84mawgoJV0VCU09DS0VUGjIIABoueyJpZCI6ICI4MDAzYzBlMiIsICJtc2ciOiAiQVVUSEVOVElDQVRFRF9BQ0sifRovCAAaK3siaWQiOiAiODAwM2MwZTIiLCAibXNnIjogIlNVQlNDUklCRURfQUNLIn1EVUp/1g6VzFIl0bzcX0KU",
            "PartitionKey": "a"
        }
    ],
... }

As you can see the last 2 records have been aggregated by the KPL, and should be de-aggregated by the consumer.

Of course I can workaround this issue by setting "AggregationEnabled" to "false" - but I think this should be supported within lifion-kinesis.

I am very happy to fix this and submit a pull request if you agree on my assessment above - please let me know.

Consumer picking up records less than that mentioned in limit

The consumer seems to be picking up much less records than the limit passed to it in the constructor even though the stream has much more records than that. I am not able to triangulate the issue.

in record processing I'm performing async operations which waits for completion before picking up the next batch of records

Automatically deregister enhanced fan-out consumers

I have a consumer group that auto scales and will sometimes need more than the default of 5 enhanced fan-out consumers, sometimes less. If I set maxEnhancedConsumers to 20, all of the consumers are created, but they remain registered even after scaling down my consumer group application.

For example, at the peak of the day, I might have 10 instances of the application running, each using 1 consumer. Overnight, I might have 1 instance of the application running and 9 consumers that aren't used.

Can lifion-kinesis automatically deregister the consumers that are not being used?

Add documentation for DynamoDB setup

Hey,

I was about to try your library which looks very promising. Could you add some kind of information on how to setup the DynamoDB and what would be the primaryKey for it?

I cannot initialize the kinesis consumer

const Kinesis = require('lifion-kinesis');
const streamName = 'testStream';
const logger = require('tracer).console({level:'debug'});

const kinesisConsumer = new Kinesis({
  apiVersion: '2013-12-02',
  region: 'eu-central-1',
  streamName,
  createStreamIfNeeded: false,
  useAutoCheckpoints: true,
  maxRetries: 5,
  dynamoDb: {
    apiVersion: '2012-08-10',
    region: 'eu-central-1',
    tableName: 'lifion-kinesis-state',
  },
  useEnhancedFanOut: false,
  logger,
});
kinesisConsumer.on('data', data => {
  console.log('Incoming data:', data);
});
kinesisConsumer.on('error', err => {
  logger.error(err);
});
kinesisConsumer.on('stats', stats => {
  logger.info(stats);
});
kinesisConsumer
  .startConsumer()
  .then(() => {
    logger.info('consumer started');
  })
  .catch(err => logger.error);

2019-09-05T13:21:51+0200 stream.js:143 (ensureStreamExists) Verifying the "testStream" stream exists and it's active…
2019-09-05T13:21:51+0200 stream.js:161 (ensureStreamExists) The stream exists and it's active.
2019-09-05T13:21:51+0200 index.js:300 (Kinesis.startConsumer) Trying to start the consumer…
2019-09-05T13:21:51+0200 table.js:93 (ensureTableExists) Verifying the "lifion-kinesis-state" table exists and it's active…
2019-09-05T13:21:52+0200 table.js:130 (ensureTableExists) The table exists and it's active.

Unfortunately in never arrives at: logger.debug('The consumer is now ready.');

Can you help me?

Get consumer shard assignment details

Hello,

I have been using this library and was wondering if it is possible to get information about the consumers and which shards they are assigned to. I know that this is possible by querying the DynamoDB table but is there a way to get this information in getStats() or some other call.

Thanks!

Localstack

Is it possible to use lifion-kinesis with localstack??
I'm trying to consume messages produced to my localstack kinesis but Im unable to. Any example maybe if it's possible?

const Kinesiss = require('lifion-kinesis');

const kinesisConsumer = new Kinesiss({
  apiVersion: '2013-12-02',
  // createStreamIfNeeded: false,
  // useAutoCheckpoints: true,
  // maxRetries: 5,
  // useEnhancedFanOut: false,
  streamName: config.sampleProducer.stream,
  region: config.kinesis.region,
  endpoint: config.kinesis.endpoint,
  secretAccessKey: 'temp',
  accessKeyId: 'temp',
  logger: console

  /* other options from AWS.Kinesis */
});

kinesisConsumer.on('data', data => {
  console.log('Incoming data:', data);
});
kinesisConsumer.on('error', err => {
  console.log(err)
});
kinesisConsumer.on('stats', stats => {
  console.log(stats)
});

kinesisConsumer
  .startConsumer()
  .then(() => {
    console.log('STARTED')
  })
  .catch((e) => console.log('Err111 ', e));

It just hangs

Verifying the "TEST_STREAM" stream exists and it's active…
The stream exists and it's active.
Trying to start the consumer…
Verifying the "lifion-kinesis-state" table exists and it's active…

JSON Parsing should be optional

Parsing JSON as part of the consumer should not be part of the responsibility of the consumer. First of all, it tries to parse JSON if it looks like it might maybe possibly have JSON. If that rough assumption is wrong, it throws a parser error which is not expected from a user of the library perspective. There are multiple errors that can occur (syntax errors, out of memory) that a consumer user might want to deal with specifically. The consumer user might also want to use more optimized JSON parsers if they know the schema.

I would like to suggest that either the consumer code should NOT parse the JSON automatically or that it should be optional.

data = JSON.parse(data);

Issue connecting to Kinesalite despite existing stream?

I am attempting to build a jest test around Kinesalite. We are using lifion-kinesis to connect to the AWS Kinesis streams in AWS, but for purposes of the component test want to use Kinesalite to isolate the program under test from its infrastructure.

I didn't want to have to modify the program, and figured providing the localhost endpoint in the configuration would work ok...and it seems to recognize it in the ensureStreamExists function but when kinesis.startConsumer(); is called it fails with the below error:

Seems like a bug for it to be recognized but fail anyway.

stream.js:143 (ensureStreamExists) Verifying the "local-kinesis-component-test" stream exists and it's active…
stream.js:161 (ensureStreamExists) The stream exists and it's active.
consumer.ts:34 (xxx) ResourceNotFoundException: Stream local-kinesis-component-test under account 000000000000 not found.
      at Request.extractError (/node_modules/lifion-kinesis/node_modules/aws-sdk/lib/protocol/json.js:51:27)
        at Request.callListeners (/node_modules/lifion-kinesis/node_modules/aws-sdk/lib/sequential_executor.js:106:20)
        at Request.emit (/node_modules/lifion-kinesis/node_modules/aws-sdk/lib/sequential_executor.js:78:10)
        at Request.emit (/node_modules/lifion-kinesis/node_modules/aws-sdk/lib/request.js:683:14)
        at Request.transition (/node_modules/lifion-kinesis/node_modules/aws-sdk/lib/request.js:22:10)
        at AcceptorStateMachine.runTo (/node_modules/lifion-kinesis/node_modules/aws-sdk/lib/state_machine.js:14:12)
        at /node_modules/lifion-kinesis/node_modules/aws-sdk/lib/state_machine.js:26:10
        at Request.<anonymous> (/node_modules/lifion-kinesis/node_modules/aws-sdk/lib/request.js:38:9)
        at Request.<anonymous> (/node_modules/lifion-kinesis/node_modules/aws-sdk/lib/request.js:685:12)
        at Request.callListeners (/node_modules/lifion-kinesis/node_modules/aws-sdk/lib/sequential_executor.js:116:18) {
      message: 'Stream local-kinesis-component-test under account 000000000000 not found.',
      code: 'ResourceNotFoundException',
      time: 2020-05-25T17:48:38.164Z,
      requestId: 'f694e540-9eaf-11ea-b2d8-8f0dd26471d6',
      statusCode: 400,
      retryable: false,
      retryDelay: 29.977332596696904

Multiple enhanced fanout issue

Hello,
We are seeing an issue while running the lifion-kinesis consumer with enhanced fanout enabled. We are unable to get multiple processes to consume from the same shard. When enhanced fanout is disabled, this works correctly, but with fanout enabled, only one of the processes is able to consume events from our shard. Here is some info on our setup:

lifion version: 1.0.8
Node.js version: 10.15.3
number of consumers: 2
number of shards: 1

Here is how we initialize our consumer:

const headerKinesis = new Kinesis({
    streamName: HEADERS_STREAM,
    consumerGroup: instanceId + "_" + pid,
    dynamoDb: {
      region: "us-east-1",
    },
    region: "us-east-1",
    logger,
  });

The goal is to have each consumer receive every event from the single shard, which is why we set a unique consumerGroup for each consumer.

When running, we see the same fan-out consumer in the logs of both consumers:
Using the "lifion-kinesis-e8xKvnt2XXDmRkDjp53quz" enhanced fan-out consumer.
but only one of the processes is able to receive events from this shard.

Is there a reason we are unable to receive the same kinesis events in all the processes? Is there an issue with our configuration? Any help or insight would be greatly appreciated! Let me know if any additional information would be helpful as well.

Thanks,
James

lifion stops fetching information from stream

This tool of yours has been behaving admirably given the load we're pushing through it, but occasionally data from a kinesis stream will stop being fetched.

The behavior where no data is fetched from a stream has been rare, but we've seen it a handful of times, and we have not been able to reproduce this behavior on demand. So far I've only seen it happen on 1 stream at a time. Others are still fetching just fine, but one will just stop even though there is more data actively being written to the stream.
Our current work-around is to cycle the containers lifion-kinesis is running on. When the tool comes back up and re-establishes the connection with kinesis it picks up where it left off.

Currently we're running 6 instances of lifion-kinesis, each in its own docker container.
Those 6 instances are monitoring ~50 kinesis streams, each stream with 1 shard.
We have also seen this happen when there is only 1 instance of lifion-kinesis.

I checked the dynamo-db created by lifion. I searched for the particular stream having an issue and found this in the 'shards' column showing that the shard is not depleted.
{ "shardId-000000000000" : { "M" : { "checkpoint" : { "S" : "49603632995838275052247271146585552269824211027765493762" }, "depleted" : { "BOOL" : false }, "leaseExpiration" : { "S" : "2020-05-22T18:30:24.553Z" }, "leaseOwner" : { "S" : "f9aV5Y9hCHisRSATtYjrGH" }, "parent" : { "NULL" : true }, "version" : { "S" : "cd6em5Roj5GzuMMa222iuB" } } }}

The 'consumers' column showed all 6 consumers with current heartbeats.

I'm not seeing clues from logging.
I thought I was really on to something with with this error recovery message, but I see the same logs even when things are working just fine.
Trying to recover from AWS.Kinesis error…\n\t- Message: Rate exceeded for stream eventing-INT-intachievewritingeventstream-31476 under account 999447569257.\n\t- Request ID: ff97d4f3-f16c-9d0f-a8d5-fec9744330b3\n\t- Code: LimitExceededException (400)\n\t- Stream: eventing-INT-intachievewritingeventstream-31476

Have you experienced this behavior in your use?

Consumer stops responding after heavy load

Hello,
We are trying to identify an issue where our consumer seems to stop responding to records from the kinesis stream. We have been able to reproduce this issue under heavy load where about 20k records are pushed to the kinesis stream. The consumer looks to start processing/handling the records and then stops responding within about a minute. The consumer appears to be active on the AWS console and we do not see any errors from the error event from the piped stream. I have verified using AWS CLI that I can send an event to the stream and retrieve it with get-records call.
We are logging the Kinesis.getStats() periodically and you can see below how there appears to be communication with AWS but not getting any more records:

"streams":{"eventing-INT-test-stream":{"lastAwsResponse":"2019-08-28T22:05:15.776Z","lastRecordConsumed":"2019-08-28T20:13:18.471Z"}

We did notice some errors in the getStats() call - appear to be dynamodb related. there are ~60:
"message":"The conditional request failed","timestamp":"2019-08-28T22:00:35.426Z","code":"ConditionalCheckFailedException","requestId":"8KOSIR8JKEJSDR9AIE10E6T32JVV4KQNSO5AEMVJF66Q9ASUAAJG","statusCode":400

another log about the consumer that should be the active one for this stream:
"Using the "lifion-kinesis-48UjgDhJNWAenVaa6UztvR" enhanced fan-out consumer.","time":"2019-08-28T20:17:36.372Z"
Thats the last time we see records for that consumer ID.

Here is the log that is when the consumer first retrieves the backlog of records:
"Got 10000 record(s) from "shardId-000000000000" (1940000ms behind)","time":"2019-08-28T20:13:17.010Z","v":0}

Here is an AWS monitor of the consumer screenshot where it looks to have stopped responding:
image

Been digging into this pretty heavy and hoping you guys could help point us in the right direction.

Thank you

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.