ZIO Kinesis is a ZIO-based wrapper around the AWS Kinesis SDK. All operations are non-blocking. It provides a streaming interface to Kinesis streams.
The project is in beta stage. Although already being used in production by a small number of organisations, expect some issues to pop up. More beta customers are welcome.
The library consists of 3 major components:
Client
andAdminClient
: ZIO wrappers around the low level AWS Kinesis SDK methods. Methods offer a ZIO-native interface with ZStream where applicable, taking care of paginated request and AWS rate limits.DynamicConsumer
: a ZStream-based interface to the Kinesis Client Library; an auto-rebalancing and checkpointing consumer.Producer
: used to produce efficiently and reliably to Kinesis while respecting Kinesis limits. Features batching and failure handling.
Add to your build.sbt:
libraryDependencies += "nl.vroste" %% "zio-kinesis" % "0.4.0"
Your SBT settings must specify the resolver for JCenter.
resolvers += Resolver.jcenterRepo
DynamicConsumer
offers a ZStream
-based interface to the Kinesis Client Library (KCL). KCL supports shard offset checkpoint storage in DynamoDB and automatic rebalancing of shard consumers between multiple workers within an application group.
This is modeled as a stream of streams, where the inner streams represent the individual shards. They can complete when the shard is assigned to another worker. The outer stream can emit new elements as shards are assigned to this worker. The inner streams can be processed in parallel as you desire.
DynamicConsumer
will handle deserialization of the data bytes as part of the stream via the Deserializer
(or Serde
) you pass it. In the example below a deserializer for ASCII strings is used. It's easy to define custom (de)serializers for, for example, JSON data using a JSON library of your choice.
Usage example:
import zio._
import nl.vroste.zio.kinesis.client.DynamicConsumer
import nl.vroste.zio.kinesis.client.serde.Serde
val streamName = "my_stream"
val applicationName ="my_awesome_zio_application"
DynamicConsumer
.shardedStream(
streamName,
applicationName = applicationName,
deserializer = Serde.asciiString
)
.flatMapPar(Int.MaxValue) { case (shardId: String, shardStream) =>
shardStream
.tap { r: DynamicConsumer.Record[String] =>
ZIO(println(s"Got record ${r} on shard ${shardId}")) *> r.checkpoint
}
.flattenChunks
}
.runDrain
-
DynamicConsumer is built on
ZManaged
and therefore resource-safe: after stream completion all resources acquired will be shutdown. -
DynamicConsumer.shardedStream takes default value for initialPosition in the stream that the application should start at =
TRIM_HORIZON
, which is from the oldest messages in Kinesis. However, from the KCL documentation, the initial position is only used during initial lease creation. When an application restarts, it will resume from the previous checkpoint, and so will continue from where it left off in the Kinesis stream. -
Enhanced Fan Out capability is set by the
isEnhancedFanOut
flag, which defaults totrue
.
The handler for messages in the example above calls r.checkpoint
. This checkpoints every message.
It is recommended
to not checkpoint not too frequently.
Instead, a count-based or period-based checkpointing scheme should be used as shown as follows.
DynamicConsumer
.shardedStream(
streamName,
applicationName = applicationName,
deserializer = Serde.byteBuffer
)
.flatMapPar(maxParallel) {
case (shardId: String, shardStream: ZStreamChunk[Any, Throwable, DynamicConsumer.Record[ByteBuffer]]) =>
shardStream
.zipWithIndex
.tap {
case (r: DynamicConsumer.Record[ByteBuffer], sequenceNumberForShard: Long) =>
handler(shardId, r) *> (
if (sequenceNumberForShard % checkpointDivisor == checkpointDivisor - 1) r.checkpoint
else UIO.succeed(())
)
}
.map(_._1) // remove sequence numbering
.flattenChunks
}
In this example, checkpointing is done once per batch of checkpointDivisor
records. This batch counting is per-shard.
The following snippet shows the full range of parameters to DynamicConsumer.shardedStream
, most of which relate
to authentication of the AWS resources.
val credentials = StaticCredentialsProvider.create(AwsBasicCredentials.create(awsKey, awsSecret))
val kinesisClientBuilder =
KinesisAsyncClient
.builder
.credentialsProvider(credentials)
.region(region)
val cloudWatchClientBuilder: CloudWatchAsyncClientBuilder =
CloudWatchAsyncClient
.builder
.credentialsProvider(credentials)
.region(region)
val dynamoDbClientBuilder =
DynamoDbAsyncClient
.builder
.credentialsProvider(credentials)
.region(region)
val initialPosition = InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
DynamicConsumer
.shardedStream(
streamName,
applicationName = applicationName,
deserializer = Serde.byteBuffer,
kinesisClientBuilder = kinesisClientBuilder,
cloudWatchClientBuilder = cloudWatchClientBuilder,
dynamoDbClientBuilder = dynamoDbClientBuilder,
initialPosition = initialPosition
)
The low-level Client
offers a putRecords
method to put records on Kinesis. Although simple to use for a small number of records, there are many catches when it comes to efficiently and reliably producing a high volume of records.
Producer
helps you achieve high throughput by batching records and respecting the Kinesis request (rate) limits. Records that cannot be produced due to temporary errors, like shard rate limits, will be retried.
All of this of course with the robust failure handling you can expect from a ZIO-based library.
Usage example:
import zio._
import nl.vroste.zio.kinesis.client._
import serde._
import Client.ProducerRecord
val streamName = "my_stream"
val applicationName ="my_awesome_zio_application"
(for {
client <- Client.create
producer <- Producer
.make(streamName, client, Serde.asciiString)
} yield producer).use { producer =>
val records = (1 to 100).map(j => ProducerRecord(s"key${j}", s"message${j}"))
producer
.produceChunk(Chunk.fromIterable(records)) *>
ZIO(println(s"All records in the chunk were produced"))
}
This example shows how the low-level Client
can be used for more control over the consuming process.
Process all shards of a stream from the beginning, using an existing registered consumer. You will have to track current shard positions yourself using some external storage mechanism.
import nl.vroste.zio.kinesis.client.Client
import nl.vroste.zio.kinesis.client.Client.ShardIteratorType
import nl.vroste.zio.kinesis.client.serde.Serde
import zio.Task
val streamName = "my_stream"
val consumerARN = "arn:aws:etc"
Client.create.use { client =>
val stream = client
.listShards("zio-test")
.map { shard =>
client.subscribeToShard(
consumerARN,
shard.shardId(),
ShardIteratorType.TrimHorizon,
Serde.asciiString
)
}
.flatMapPar(Int.MaxValue) { shardStream =>
shardStream.mapM { record =>
// Do something with the record here
// println(record.data)
// and finally checkpoint the sequence number
// customCheckpointer.checkpoint(record.shardID, record.sequenceNumber)
Task.unit
}
}
stream.runDrain
}
The more administrative operations like creating and deleting streams are available in the AdminClient
.
Refer to the AWS Kinesis Streams API Reference for more information.
By default Client
, AdminClient
, DynamicConsumer
and Producer
will load AWS credentials and regions via the Default Credential/Region Provider. Using the client builders, many parameters can be customized. Refer to the AWS documentation for more information.
Refer to the unit tests.
The Serde construct in this library is inspired by zio-kafka, the producer by this AWS blog post