Giter Site home page Giter Site logo

zio-kinesis's Introduction

Sonatype Nexus (Releases) Sonatype Nexus (Snapshots)

Use and like this library? Consider sponsoring its ongoing development and maintenance

ZIO Kinesis

ZIO Kinesis is a ZIO-based interface to Amazon Kinesis Data Streams for consuming and producing. A Future-based version of some of the functionality is also available.

The project is in beta stage. Although already being used in production by a small number of organisations, expect some issues to pop up and some changes to the interface. More beta users and feedback are of course welcome.

Features

The library consists of:

  • Consumer
    A ZStream interface to Kinesis streams, including checkpointing , lease coordination and metrics. This is a ZIO-native client built on top of the AWS SDK (via zio-aws), designed for compatibility with KCL clients. Both polling and enhanced fanout (HTTP2 streaming) are supported.

  • Producer
    Put records efficiently and reliably on Kinesis while respecting Kinesis throughput limits. Features batching and failure handling.

  • DynamicConsumer An alternative to Consumer, being a wrapper around the AWS Kinesis Client Library (KCL).
    NOTE Although DynamicConsumer will be included in this library for some time to come, it will eventually be deprecated and removed in favour of the ZIO-native Consumer. Users are recommended to upgrade.

zio-kinesis is built on top of zio-aws, a library of automatically generated ZIO wrappers around AWS SDK methods.

Installation

Add to your build.sbt:

libraryDependencies += "nl.vroste" %% "zio-kinesis" % "<version>"

The latest version is built against and requires ZIO v2.0.2.

Consumer

Consumer offers a fully parallel streaming interface to Kinesis Data Streams.

Features:

  • Record streaming from multiple shards in parallel
  • Multiple worker support: lease rebalancing / stealing / renewing (compatible with KCL)
  • Polling fetching and enhanced fanout (HTTP2 streaming)
  • Deserialization of records to any data type
  • Checkpointing of records according to user-defined Schedules
  • Automatic checkpointing at shard stream shutdown due to error or interruption
  • Handling changes in the number of shards (resharding) while running
  • Support for protobuf-aggregated records (KPL / KCL compatible)
  • Correct handling of Kinesis resource limits (throttling and backoff)
  • KCL compatible metrics publishing to CloudWatch
  • Compatibility for running alongside KCL consumers
  • Emission of diagnostic events for custom logging / metrics / testing
  • Manual or otherwise user-defined shard assignment strategy
  • Pluggable lease/checkpoint storage backend
  • Optimized startup + shutdown sequence

Basic usage using consumeWith

For a lot of use cases where you just want to do something with all messages on a Kinesis stream, zio-kinesis provides the convenience method Consumer.consumeWith. This method lets you execute a ZIO effect for each message, while retaining all features like parallel shard processing, checkpointing and resharding.

import nl.vroste.zio.kinesis.client.serde.Serde
import zio.Console.printLine
import zio._

/**
 * Basic usage example for `Consumer.consumeWith` convenience method
 */
object ConsumeWithExample extends ZIOAppDefault {
  override def run: ZIO[Any with ZIOAppArgs with Scope, Any, Any] =
    Consumer
            .consumeWith(
              streamIdentifier = "my-stream",
              applicationName = "my-application",
              deserializer = Serde.asciiString,
              workerIdentifier = "worker1",
              checkpointBatchSize = 1000L,
              checkpointDuration = 5.minutes
            )(record => printLine(s"Processing record $record"))
            .provideLayer(Consumer.defaultEnvironment)
            .exitCode
}

More advanced usage

If you want more fine-grained control over the processing stream, error handling or checkpointing, use Consumer.shardedStream to get a stream of shard-streams, like in the following example:

import nl.vroste.zio.kinesis.client.serde.Serde
import nl.vroste.zio.kinesis.client.zionative.Consumer
import zio.Console.printLine
import zio._

object NativeConsumerBasicUsageExample extends ZIOAppDefault {
  override def run: ZIO[ZIOAppArgs with Scope, Any, Any] =
    Consumer
            .shardedStream(
              streamIdentifier = "my-stream",
              applicationName = "my-application",
              deserializer = Serde.asciiString,
              workerIdentifier = "worker1"
            )
            .flatMapPar(Int.MaxValue) { case (shardId, shardStream, checkpointer) =>
              shardStream
                      .tap(record => printLine(s"Processing record ${record} on shard ${shardId}"))
                      .tap(checkpointer.stage(_))
                      .viaFunction(checkpointer.checkpointBatched[Any](nr = 1000, interval = 5.minutes))
            }
            .runDrain
            .provideLayer(Consumer.defaultEnvironment)
            .exitCode
}

Let's go over some particulars of this example:

  • Consumer.shardedStream is a stream of streams. Each of the inner stream represents a Kinesis Shard. Along with this inner stream, the shard ID and a Checkpointer are emitted. Each of the shard streams are processed in parallel.
  • The deserializer parameter takes care of deserializing records from bytes to a data type of your choice, in this case an ASCII string. You can easily define custom (de)serializers for, for example, JSON data using a JSON library of your choice.
  • After processing the record (here: printing to the console), the record is staged for checkpointing. This is useful to ensure that when the shard stream is interrupted or fails, a checkpoint call is made.
  • The checkpointBatched method on the Checkpointer is a helper method that batches records up to 1000 or within 5 seconds, whichever comes earlier. At the end of that window, the last staged record will be checkpointed. It also takes care of ending the shard stream when the lease has been lost or there is some other error in checkpointing. The Console type parameter is unfortunately necessary for correct type inference.
  • runDrain will run the stream until the application is interrupted or until the stream fails.
  • provideLayer provides the environment (dependencies via ZLayer) necessary to run the Consumer. The default environment uses AWS SDK default settings (i.e. default credential provider)
  • exitCode maps the failure or success of the stream to a system exit code.
  • A logging environment is provided for debug logging. See zio-logging for more information on how to customize this.

Checkpointing

Checkpointing is a mechanism that ensures that stream processing can be resumed correctly after application restart. Checkpointing has 'up to and including' semantics, which means that checkpointing records with sequence number 100 means acknowledging processing of records 1 to 100.

It is recommended not to checkpoint too frequently. It depends on your application and stream volume what is a good checkpoint frequency (in terms of number of records and/or interval).

zio-kinesis has some mechanisms to improve checkpointing safety in the case of interruption or failures:

  • To guarantee that the last processed record is checkpointed when the stream shuts down, because of failure or interruption for example, checkpoints for every record should be staged by calling checkpointer.stage(record). A periodic call to checkpointer.checkpoint will 'flush' the last staged checkpoint.

  • To ensure that processing of a record is always followed by a checkpoint stage, even in the face of fiber interruption, use the utility method Checkpointer.stageOnSuccess(processingEffect)(r).

  • When checkpointing fails, it is retried according to the provided Schedule. By default this is an exponential backoff schedule with a maximum number of attempts.

  • Taking into account retries, checkpointing may fail due to:

    • ShardLeaseLost: this indicates that while attempting to update the lease, it was discovered that another worker has stolen the lease. The shard should no longer be processed. This can be achieved by recovering the stream with a ZStream.empty via catchAll.
    • Transient connection / AWS issues. Checkpointer.checkpoint will fail with a Throwable.

Lease coordination

Consumer supports checkpointing and lease coordination between multiple workers. Leases and checkpoints for each shard are stored in a table (DynamoDB by default). These leases are periodically renewed, unless they were recently (less than the renewal interval) checkpointed already.

When a new worker joins, it will take some leases from other workers so that all workers get a fair share. The other workers will notice this either during checkpointing or lease renewal and will end their processing of the stream. Because of this, records are processed 'at least once'; for a brief time (at most the renew interval) two workers may be processing records from the same shard. When your application has multiple workers, it must be able to handle records being processed more than once because of this.

When one worker fails or loses connectivity, the other workers will detect that the lease has not been updated for some time and will take over some of its leases, so that all workers have a fair share again.

When a worker is stopped, its leases are released so that other workers may pick them up.

Resharding

Changing the stream's shard count, or resharding, is fully supported while the consumer is active. When a shard is split or two shards are merged, before processing of the new shard starts, the parent shard(s) are processed until the end. When a worker detects the end of a shard it is processing, Kinesis will tell it the new (child) shards and their processing will start immediately after both parent shards have been completely processed.

When another worker is processing one of the parent shards, it may take a while for this to be detected.

To ensure that no shard is left behind, the list of shards is refreshed periodically.

Consuming multiple Kinesis streams

If you want to process more than one Kinesis stream, simply create more than one instance and ensure that the application name is unique per stream. The application name is used to create a lease table. Unique application names will create a lease table per stream, to ensure that shard IDs do not conflict. Unlike the KCL, which heavily uses threads, there should be no performance need to have multi-stream support built into Consumer.

For example, if your application name is "order_processing" and you want to consume the streams "orders" and "payments", create one Consumer with the application name "order_processing_orders" and one "order_processing_payments".

Customization

The following parameters can be customized:

  • Initial position
    When no checkpoint is found for a shard, start from this position in the stream. The default is to start with the oldest message on each shard (TRIM_HORIZON).
  • Use enhanced fanout or polling
    See the AWS docs
  • Polling:
    • Maximum batch size
    • Poll interval
    • Backoff schedule in case of throttling (Kinesis limits)
    • Retry schedule in case of other issues
  • Enhanced fanout:
    • Maximum shard subscriptions to make per second.
      Although there is no Kinesis limit, this prevents a stream of 1000 shards from making 1000 simultaneous HTTP requests.
    • Retry schedule in case of issues
  • Lease coordination
    • Lease expiration time
    • Lease renewal interval
    • Lease refresh & take interval

Diagnostic event & metrics

Consumer.shardedStream has a parameter for a function that is called with a DiagnosticEvent whenever something of interest happens in the Consumer. This is useful for diagnostic purposes and for metrics.

A KCL compatible CloudWatch metrics publisher is included, which can optionally be hooked on to these diagnostic events.

See core/src/test/scala/nl/vroste/zio/kinesis/client/examples/NativeConsumerWithMetricsExample.scala for an example.

KCL compatibility

Lease coordination and metrics are fully compatible for running along other KCL workers.

Unsupported features

Features that are supported by DynamicConsumer but not by Consumer:

  • DynamoDB lease table billing mode configuration
    This can be adjusted in AWS Console if desired or manually using the AWS DynamoDB SDK.
  • Some metrics
    Not all metrics published by the KCL are implemented yet. Some of them are not applicable because of different implementations.

Configuration

The default environments for Client, AdminClient, Consumer, DynamicConsumer and Producer will use the Default Credential/Region Provider. Using the client builders, many parameters can be customized. Refer to the AWS documentation for more information on the possible parameters.

Default client ZLayers are provided in nl.vroste.zio.kinesis.client package object for production and for integration tests are provided in LocalStackServices

Producer

The low-level Client offers a putRecords method to put records on Kinesis. Although simple to use for a small number of records, there are many catches when it comes to efficiently and reliably producing a high volume of records.

Producer helps you achieve high throughput by batching records and respecting the Kinesis request (rate) limits. Records that cannot be produced due to temporary errors, like shard rate limits, will be retried.

All of this of course with the robust failure handling you can expect from a ZIO-based library.

Usage example:

import nl.vroste.zio.kinesis.client
import nl.vroste.zio.kinesis.client.serde.Serde
import nl.vroste.zio.kinesis.client.{ Producer, ProducerRecord }
import zio.Console.printLine
import zio._

object ProducerExample extends ZIOAppDefault {
  val streamIdentifier      = "my_stream"
  val applicationName = "my_awesome_zio_application"

  val env = client.defaultAwsLayer ++ Scope.default

  val program = Producer.make(streamIdentifier, Serde.asciiString).flatMap { producer =>
    val record = ProducerRecord("key1", "message1")

    for {
      _ <- producer.produce(record)
      _ <- printLine(s"All records in the chunk were produced")
    } yield ()
  }

  override def run: ZIO[ZIOAppArgs with Scope, Any, Any] =
    program.provideLayer(env).exitCode
}

Aggregation

Each shard has an ingestion limit of 1 MB/s and 1000 records/s. When your records are small, you may not reach the 1MB/s but you will be limited by the 1000 records/s.

Producer can aggregate multiple user records into one Kinesis record to optimize usage of the shard capacity. Consumer and DynamicConsumer can automatically deaggregate these records transparently to the user. Checkpointing within an aggregate is supported as well.

Aggregation is off by default but can be enabled by setting ProducerSettings.aggregate to true.

This feature is fully compatible with the KPL and KCL.

Metrics

Producer periodically collects metrics like success rate and throughput and makes them available as ProducerMetrics values. Statistical values are collected in a HdrHistogram. Metrics are collected every 30 seconds by default, but the interval can be customized.

ProducerMetrics objects can be combined with other ProducerMetrics to get (statistically sound!) total metrics, allowing you to do your own filtering, aggregation or other processing if desired.

The list of available metrics is:

  • Throughput (records/s)
  • Success rate
  • Latency distribution
  • Nr of records published
  • Nr of failures
  • Nr of attempts distribution
  • Nr of PutRecords calls
  • Record payload size distribution
  • Batch payload size distribution

Example usage:

import nl.vroste.zio.kinesis.client
import nl.vroste.zio.kinesis.client.producer.ProducerMetrics
import nl.vroste.zio.kinesis.client.serde.Serde
import nl.vroste.zio.kinesis.client.{ Producer, ProducerRecord, ProducerSettings }
import zio.Console.printLine
import zio._

object ProducerWithMetricsExample extends ZIOAppDefault {
  val streamIdentifier      = "my_stream"
  val applicationName = "my_awesome_zio_application"

  val env = client.defaultAwsLayer ++ Scope.default

  val program = (for {
    totalMetrics <- Ref.make(ProducerMetrics.empty)
    producer     <- Producer
            .make(
              streamIdentifier,
              Serde.asciiString,
              ProducerSettings(),
              metrics => totalMetrics.updateAndGet(_ + metrics).flatMap(m => printLine(m.toString).orDie)
            )
  } yield (producer, totalMetrics)).flatMap { case (producer, totalMetrics) =>
    val records = (1 to 100).map(j => ProducerRecord(s"key${j}", s"message${j}"))

    for {
      _ <- producer.produceChunk(Chunk.fromIterable(records))
      _ <- printLine(s"All records in the chunk were produced").orDie
      m <- totalMetrics.get
      _ <- printLine(s"Metrics after producing: ${m}").orDie
    } yield ()
  }

  override def run: ZIO[Any with ZIOAppArgs with Scope, Any, Any] = program.provideLayer(env).exitCode
}

Future-based interface

For cases when you need to integrate with existing Future-based application code, Consumer and Producer are available with a scala Future-based interface as well.

Producer offers full functionality while Consumer offers only consumeWith, the easiest way of consuming records from Kinesis.

To use, add the following to your build.sbt:

libraryDependencies += "nl.vroste" %% "zio-kinesis-future" % "<version>"

Consumer and Producer are now available in the nl.vroste.zio.kinesis.interop.futures package.

Producer can be used as follows:

import nl.vroste.zio.kinesis.client.ProducerRecord
import nl.vroste.zio.kinesis.client.serde.Serde

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

object ProducerExample extends App {
  val producer = Producer.make[String]("my-stream", Serde.asciiString, metricsCollector = m => println(m))

  val done = Future.traverse(List(1 to 10)) { i =>
    producer.produce(ProducerRecord("key1", s"msg${i}"))
  }

  Await.result(done, 30.seconds)

  producer.close()
}

DynamicConsumer

DynamicConsumer is an alternative to Consumer, backed by the Kinesis Client Library (KCL).

NOTE: Although DynamicConsumer will be included in this library for some time to come for backwards compatibility, it will eventually be deprecated and removed in favour of the ZIO native Consumer. Users are recommended to upgrade.

The interface is largely the same as Consumer, except for:

  • Some parameters for configuration
  • The ZIO environment
  • Checkpointing is an effect that can fail with a Throwable instead of Either[Throwable, ShardLeaseLost.type]. A ShutdownException indicates that the shard should no longer be processed. See the documentation on Checkpointer.checkpoint for more details.
  • Retrying is not done automatically by DynamicConsumer's Checkpointer.

Unlike Consumer, DynamicConsumer also supports:

  • KPL record aggregation via Protobuf + subsequence number checkpointing
  • Full CloudWatch metrics publishing

Basic usage using consumeWith

For a lot of use cases where you just want to do something with all messages on a Kinesis stream, zio-kinesis provides the convenience method DynamicConsumer.consumeWith. This method lets you execute a ZIO effect for each message. It takes care of checkpointing which you can configure through checkpointBatchSize and checkpointDuration parameters.

import nl.vroste.zio.kinesis.client.defaultAwsLayer
import nl.vroste.zio.kinesis.client.dynamicconsumer.DynamicConsumer
import nl.vroste.zio.kinesis.client.serde.Serde
import zio.Console.printLine
import zio._

/**
 * Basic usage example for `DynamicConsumer.consumeWith` convenience method
 */
object DynamicConsumerConsumeWithExample extends ZIOAppDefault {
  override def run: ZIO[Any with ZIOAppArgs with Scope, Any, Any] =
    DynamicConsumer
            .consumeWith(
              streamIdentifier = "my-stream",
              applicationName = "my-application",
              deserializer = Serde.asciiString,
              workerIdentifier = "worker1",
              checkpointBatchSize = 1000L,
              checkpointDuration = 5.minutes
            )(record => printLine(s"Processing record $record"))
            .provideLayer(defaultAwsLayer >+> DynamicConsumer.live)
            .exitCode
}

DynamicConsumerFake

Often it's extremely useful to test your consumer logic without the overhead of a full stack or localstack Kinesis. To this end we provide a fake ZLayer instance of the DynamicConsumer accessed via DynamicConsumer.fake.

Note this also provides full checkpointing functionality which can be tracked via a Ref passed into the refCheckpointedList parameter.

import nl.vroste.zio.kinesis.client.dynamicconsumer.DynamicConsumer
import nl.vroste.zio.kinesis.client.dynamicconsumer.DynamicConsumer.Record
import nl.vroste.zio.kinesis.client.dynamicconsumer.fake.DynamicConsumerFake
import nl.vroste.zio.kinesis.client.serde.Serde
import zio.Console.printLine
import zio.stream.ZStream
import zio._

/**
 * Basic usage example for `DynamicConsumerFake`
 */
object DynamicConsumerFakeExample extends ZIOAppDefault {
  private val shards: ZStream[Any, Nothing, (String, ZStream[Any, Throwable, Chunk[Byte]])] =
    DynamicConsumerFake.shardsFromStreams(Serde.asciiString, ZStream("msg1", "msg2"), ZStream("msg3", "msg4"))

  override def run: ZIO[Any with ZIOAppArgs with Scope, Any, Any] =
    for {
      refCheckpointedList <- Ref.make[Seq[Record[Any]]](Seq.empty)
      exitCode            <- DynamicConsumer
              .consumeWith(
                streamIdentifier = "my-stream",
                applicationName = "my-application",
                deserializer = Serde.asciiString,
                workerIdentifier = "worker1",
                checkpointBatchSize = 1000L,
                checkpointDuration = 5.minutes
              )(record => printLine(s"Processing record $record").orDie)
              .provideLayer(DynamicConsumer.fake(shards, refCheckpointedList))
              .exitCode
      _                   <- printLine(s"refCheckpointedList=$refCheckpointedList").orDie
    } yield exitCode

}

Advanced usage

If you want more control over your stream, DynamicConsumer.shardedStream can be used:

import nl.vroste.zio.kinesis.client.defaultAwsLayer
import nl.vroste.zio.kinesis.client.dynamicconsumer.DynamicConsumer
import nl.vroste.zio.kinesis.client.serde.Serde
import zio.Console.printLine
import zio._

/**
 * Basic usage example for DynamicConsumer
 */
object DynamicConsumerBasicUsageExample extends ZIOAppDefault {
  override def run: ZIO[Any with ZIOAppArgs with Scope, Any, Any] =
    DynamicConsumer
            .shardedStream(
              streamIdentifier = "my-stream",
              applicationName = "my-application",
              deserializer = Serde.asciiString,
              workerIdentifier = "worker1"
            )
            .flatMapPar(Int.MaxValue) { case (shardId, shardStream, checkpointer) =>
              shardStream
                      .tap(record => printLine(s"Processing record ${record} on shard ${shardId}"))
                      .tap(checkpointer.stage(_))
                      .viaFunction(checkpointer.checkpointBatched[Any](nr = 1000, interval = 5.minutes))
            }
            .runDrain
            .provideLayer(defaultAwsLayer >>> DynamicConsumer.live)
            .exitCode
}

DynamicConsumer is resource-safe thanks to ZIO's Scope: after stream completion all resources acquired will be shutdown.

Running tests and more usage examples

See core/src/test/scala/nl/vroste/zio/kinesis/client/examples for some examples. The integration tests are also good usage examples.

The tests run against a localstack docker image to access kinesis, dynamoDb and cloudwatch endpoints locally. In order to run the tests you need to have docker and docker-compose installed on your machine. Then on your machine open a terminal window and navigate to the root of this project and type:

> docker-compose -f docker/docker-compose.yml up -d

To run the tests, enter the following in the terminal:

> sbt test   

Don't forget to shut down the docker container after you have finished. In the terminal type:

> docker-compose -f docker/docker-compose.yml down

Credits

The Serde construct in this library is inspired by zio-kafka, the producer by this AWS blog post

Table of contents generated with markdown-toc.

zio-kinesis's People

Contributors

googley42 avatar hwielenberg avatar leigh-perry avatar mergify[bot] avatar mmalik23 avatar mschuwalow avatar scala-steward avatar sebver avatar svroonland avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

zio-kinesis's Issues

Benchmark Consumer vs KCL

  • Startup time to first record processed
  • Shutdown time
  • Max nr of parallel shards on one machine
  • CPU usage
  • Memory usage
  • How many threads does KCL create and how does it scale with # of shards?

Producer metrics

Nr success
Nr failed
Latency distribution
Custom interval
Per shard perhaps?
Monoidal metrics data structure for easy aggregation

Example project

A self-contained SBT project with a ZIO app that both produces and consumes and does something interesting with the data.

Fix flaky DynamicConsumer test

zio-test output is

[info] - DynamicConsumer
[info]   + consume records produced on all shards produced on the stream
[info]   - support multiple parallel consumers on the same Kinesis stream
[info] Timeout of 5 m exceeded.
[info]   + checkpoint for the last processed record at stream shutdown
[info] Ran 3 tests in 5 m 31 s: 2 succeeded, 0 ignored, 1 failed
Putting records
error1: software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException: Stream zio-test-stream-f512f881-04ca-4287-8e4e-177832cb8f84 under account 000000000000 not found. (Service: Kinesis, Status Code: 400, Request ID: 337d31e0-6013-11eb-9115-878cea0c958a, Extended Request ID: XmEe0KJk+3azcFVQRDk4MJ8HdLo7GMYauq2hgLXMsl/vrwTv8luAVN2DDoyq5Dfuvn8un2Bv8cDB3kKoMBcQa1HFyK/fG2np)

fs2 support

Hi!

Your library is very great and I am happy to use it.

What do you think is it possible to create interop for fs2 streams? We have lots of tools already done in fs2 and it would be hard to maintain both effects (ZIO, cats.effect.IO) and I would be grateful to have your lib using fs2

Thanks for feedback!

KCL based client does not handle resharding

The KCL backed DynamicConsumer does not seem to handle re-sharding elegantly.

Upon re-sharding while a consumer is running the existing shard completes processing, but the checkpoint is never updated to SHARD_END. This causes the underlying KCL library to throw exceptions:

j.l.IllegalArgumentException: Application didn't checkpoint at end of shard shardId-000000000001. Application must checkpoint upon shard end. See ShardRecordProcessor.shardEnded javadocs for more information.
	at s.a.k.l.ShutdownTask.call(ShutdownTask.java:138)
	at s.a.k.l.ShardConsumer.executeTask(ShardConsumer.java:327)
	at s.a.k.l.ShardConsumer.lambda$shutdownComplete$3(ShardConsumer.java:306)
	at j.u.c.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
	at j.u.c.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at j.u.c.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

In our case the resharding was done twice. We saw errors like the above for shardId-000000000000, after manually updating the client DDB table to SHARD_END processing continued until we started seeing the same errors on shardId-000000000001 and shardId-000000000002

edit: Because there is no checkpoint for shard end, restarting the consumer has no effect, it just hits the same error

add tableName to DynamicConsumer parameters

The ability to specify Kinesis lease table tableName in the DynamicConsumer is required. ATM it defaults to applicationName https://github.com/awslabs/amazon-kinesis-client/blob/8873b1346ff033de01fa1a237c0436d8fb762d9a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java#L87-L92 in ConfigsBuilder if it is not specified and ConfigsBuilder is managed by the DynamicClient. We have strict naming conventions for our DynamoDB tables that do not make nice reading for an application name (eg when looking at cloudwatch metrics). I was thinking of another parameter that is optional eg maybeTableName: Option[String] = None .

Add debug logging

For development and user troubleshooting

Possibly using zio-logging? May require #132

use zio-logging inside DynamicConsumer

Dynamic consumer currently does not not use logging.

It would be useful to be able to turn on say debug level logging and see log output for key events - eg KCL lifecycle events etc etc

Handle discovery of lease lost during checkpointing

During a call to ShardRecordProcessor#processRecords and finishing of processing those records, another worker may steal the lease for the shard. In that situation checkpoints can no longer be made and will fail with a ShutdownException.

The RecordProcessorCheckpointer supports preparing checkpoints, i.e. claiming the shard momentarily to prevent stealing. This is useful to limit non-idempotent side-effects when leases are lost.

Handling this should be done as follows in user land:

shardedStream(..)
  .flatMapPar { case (shardId, shardStream) =>
    shardStream
      .tap(r => processRecord(r))
      .aggregateAsyncWithin(ZTransducer.last, Schedule.fixed(1.second))
      .mapConcat(_.toList)
      .tap(r => r.checkpoint)
      .orElse(ZStream.empty) // This will finish the shard stream
  }

In the ShardQueue we should handle the shutdown of the shard stream (originating from user land) and, combined with the leaseLost signal from the KCL, not recreate the shard stream (as proposed in #127).

This way, only one 'discovery' that the lease lost is necessary, instead of one for every record for the shard that is still buffered.

Performance degradation in Producer

After upgrading from zio-kinesis 0.5.0 -> 0.11.0 we noticed a significant impact in performance when using the Producer class to publish records to Kinesis.

We have observed batching-like behaviour where response time is very high initially and improves (though still isn't great) with higher throughput. Screenshot attached.

image

Change default checkPointDuration on consumeWith for DynamicComsumer

Change default checkPointDuration on consumeWith for DynamicConsumer - this is currently 5 seconds and should be 5 minutes.

The AWS documentation states that 5 minutes is the recommended default checkpoint duration for the version of KCL we are using. They state that you should avoid check pointing too frequently as this puts pressure on the infrastructure.

Moreover in production one of our teams used the default 5 seconds for the DynamicConsumer and this lead to stuck/zombie and re-deployments/restarts were necessary on a daily basis to clear the condition. When we increased the duration to 5 minutes we had 100% reliability.

Its worth noting that the other condition for checkpoint frequency when using consumeWith is batchsize which we have set at 200 which ensures a relatively fine grained checkpoint frequency.

Also for all consumers we have strong checkpointing guarantees on failure based on ZIO bracket that we can rely on.

Completely ZIO-native KCL equivalent

Probably a V2 feature, but it would be cool to replace the entire KCL functionality with a ZIO-native version, built on top of this library's low-level Client and some DynamoDB client. Compatible with other KCL apps of course.

Queue filling up when shard stream is ended prematurely

When one of the shard streams is ended for some reason, by a .take(5) for example but the main stream keeps running, the RecordProcessor for that shard will keep on putting items in the queue for that shard but they will never be processed. This is of course a resource leak and unwanted behavior.

Ideally this shard stream is somehow emitted anew from the main stream.

Checkpointing on shutdown

Hi guys

Is there any way to checkpoint the latest processed record when the service has been signalled to shutdown?

combine calls to aggregateAsyncWithin with skip on error processing

There is still the general issue that wherever aggregateAsyncWithin is used in zio-kinesis, the record staging/checkpointing will be broken on error due to bug zio/zio#4039 - leading to a loss of data.

A possible fix is to use some function like def processWithSkipOnError(refSkip: Ref[Boolean])(effect: Task[Unit]): ZIO[Any, Throwable, Unit]

Dynamic throttling in Producer (Tap)

Although with #263 Producer will now produce no more than 1000 records/shard/s or 1MB/shard/s, when there is another worker producing records there will still be ProvisionedThroughputExceededExceptions.

We can apply something like the Tap from https://degoes.net/articles/zio-challenge to dynamically find a good throttling rate that minimizes the throughput excess while keeping the throughput as high as possible. Backpressure without reactive streaming.

Interop for regular scala

To ease adoption of this library, find some nice way to use it from regular scala code, Future-based.

Consumer: handle resharding

  • Periodically check for new shards
  • Wait for the parent shards to be completed before processing new shards
  • Checkpoint with SHARD_END for KCL compatibility
  • New shards should have the same worker affinity as the parent shards

Consumer: support KPL-produced records / subsequencenumber checkpointing

Relates to #3

See https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md

  • Consumer:
    • Use sbt-protobuf to generate data classes
    • Detect aggregated records (magic bytes?):
    • Deaggregate records in stream
    • Resume from last checkpoint at subsequence number
  • Producer:
    • Sub-aggregation per shard in PutRecordsBatch to keep records in order
      • Periodic shard refresh (using ExclusiveStartShardId to only load new ones)
      • Maintain map of open shards: shard ID to hash key range
      • Predict shard
      • Invalidate and update shard map when actual shard for record != predicted shard NOTE: This means records can end up on the wrong shard..? I suppose you should not use aggregation in combination with resharding if that's critical functionality KPL doesn't promise sequentiality either
      • Metrics correct for aggregation
      • Correct handling of retries for aggregated records

create a Fake DynamicConsumer implementation

This would enable users to run their consumer apps without Kinesis.

I think this would also be useful in tests to confirm checkpointing behaviour (assuming it contained a Ref of checkpointed records)

One idea is to supply the core nested ZStreams directly. This could then be built on by using ZStreams to stream record data from a file.

local tests not running after CI fix

local test do run any more. I suspect the issue is related to port changes introduced by this commit a005500 to fix CI builds. I suspect we may need different port config for local tests vs test run on CI

KCL implementation sets the consumer name incorrectly

Consumer name is set to be the randomly generated workerId. The consumer name is used to construct the ARN of the consumer and is not related to the workerId. I think it's probably best to not explicitly set this and leave it as the default (which will be the applicationName)

https://github.com/svroonland/zio-kinesis/blob/master/src/main/scala/nl/vroste/zio/kinesis/client/DynamicConsumerLive.scala#L151

edit: have managed to work around this by setting workerId=applicationName

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.