etspaceman / kinesis4cats Goto Github PK
View Code? Open in Web Editor NEWA comprehensive Scala library for Kinesis
Home Page: https://etspaceman.github.io/kinesis4cats/
License: Apache License 2.0
A comprehensive Scala library for Kinesis
Home Page: https://etspaceman.github.io/kinesis4cats/
License: Apache License 2.0
First of all, thank you for a great library! I was able to use it to migrate an old KCL app still using v1 of the AWS KCL.
One hurdle I ran into, however, was that the docs for using the KCLConsumerFS2
show a call to consumer.commitRecords
, so I added one in my code as well, thinking it was necessary. Only after testing and debugging did I find that the default config results in auto-committing. Ultimately this resulted in runtime errors when my manual commit occurred, because I was trying to commit a record that had already been committed by the auto-commit:
java.lang.IllegalArgumentException: Could not checkpoint at extended sequence number {SequenceNumber: XXXXX,SubsequenceNumber: 0} as it did not fall into acceptable range between the last checkpoint {SequenceNumber: YYYYY,SubsequenceNumber: 0} and the greatest extended sequence number passed to this record processor {SequenceNumber: YYYYY,SubsequenceNumber: 0}
software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer.checkpoint(ShardRecordProcessorCheckpointer.java:124)
kinesis4cats.kcl.CommittableRecord.$anonfun$checkpoint$1(CommittableRecord.scala:87)
map @ kinesis4cats.compat.retry.package$.$anonfun$retryingOnSomeErrorsImpl$3(package.scala:103)
map @ kinesis4cats.compat.retry.package$.$anonfun$retryingOnSomeErrorsImpl$3(package.scala:98)
flatMap @ kinesis4cats.compat.retry.package$.$anonfun$retryingOnSomeErrorsImpl$2(package.scala:97)
map2 @ fs2.Stream$.$anonfun$fromQueueUnterminated$1(Stream.scala:3540)
map @ kinesis4cats.compat.retry.package$.applyPolicy(package.scala:116)
flatMap @ kinesis4cats.compat.retry.package$.$anonfun$retryingOnSomeErrorsImpl$1(package.scala:96)
Do you think it would make sense to update the docs to clarify that calling consumer.commitRecords
is not necessary unless you've explicitly changed the default RecordProcessor.Config
behavior?
Modules that generate code for smithy.rules
and com.amazonaws.kinesis
should probably shade them under other packages to avoid binary conflicts when used with something else that generates code for these namespaces - at least for smithy.rules
, I feel like we shouldn't risk that people use another version and end up getting a binary incompatibility.
This really depends on the question: should the smithy4s-ness be exposed to users in ways other than the smithy4s dependency?
The Client Consumers should rival the KCL, be offered for both smithy4s and AWS Java, and handle the following:
Currently, once you start a consumer (KCL), you don't really know when it's running - as a user, as far as I can tell, you have to manually attach a workerStateChangedListener. We should encapsulate this internally and offer a way to wait for that, e.g. for the purpose of testing.
The implementation idea is to create a SignallingRef
holding the status, and update it in the listener via cats.effect.std.Dispatcher
. Then have a process that observes changes to the SignallingRef
and completes when it's STARTED
. I have some code for this that could be easily adaptable to the library.
Publish Smithy4s libraries for both ScalaJS and Scala Native.
Currently, the kinesis-client Producer has two methods:
The idea is to have just one put
, and control retry-ness and retry parameters via the configuration of the producer. Additionally the default config should set some sane defaults, presumably "do retries".
Client Producers should compete with the KPL, and offered in both Smithy4s and AWS Java. Features that should be included:
There are a few flaky CI tests that should be looked into as they occur
e.g. when you use k4c:smithy4s-client
in an app that also generates code, the app will end up generating code for com.amazonaws.kinesis
. This is not only redundant, but also these generated classes take priority over the k4c ones (they're probably first on the classpath), and they don't have all the necessary transformations applied.
I think this is connected to #101, and I have two ideas for fixing it:
smithy4sDependencies: com.disneystreaming.alloy:alloy-core:0.1.18,softwa
re.amazon.smithy:smithy-rules-engine:1.30.0,com.disneystreaming.smithy:
aws-kinesis-spec:2023.02.10
smithy4s.tracking.smithy
file listing the namespaces. This might be easier, as it's an addition over the status quo, meanwhile amending the manifest would require reverting a smithy4s-codegen-induced change. We also already have a snippet that does this:Compile / smithy4sCodegen := {
val (toRemove, toKeep) =
(Compile / smithy4sCodegen).value.partition(f => f.ext == "smithy" && f.base != "smithy4s.tracking")
val manifest = (Compile / smithy4sResourceDir).value / "META-INF" / "smithy" / "manifest"
IO.delete(toRemove)
IO.write(manifest, "smithy4s.tracking.smithy\n")
toKeep
}
The KPL will attempt to leverage a shard-map and aggregate records that share a predicted shard. The logic for this is as follows:
The KCL will silently discard data in aggregated records if they do not have a calculated hash-key within the received shard's range. So re-sending the records, aggregated against the proper shard-id, is vital to receiving all records. This could result in duplicates though, as some records of the original request may have been successfully produced.
An example: consider a scale up operation, where new shards are created, and hash ranges for the new shards are smaller than the previous shard map. It is possible that some of the records of the original try could have successfully been consumed by the KCL, as the calculated hash key could have fallen into that smaller range. Others may not have. So this would mean a retry of the request would result in duplicates being consumed by the KCL.
This also seems like quite a bit of overhead to save a few requests. So the default today in kinesis4cats's Producer differs from the KPL, in that it only aggregates records that share a partition key. This means that the shard map can be stale or empty, and aggregation can still occur safely.
kinesis4cats should also offer the capabilities that the KPL offers, as an opt-in feature. This issue is for that feature implementation.
References:
https://github.com/awslabs/amazon-kinesis-producer/blob/master/aws/kinesis/core/aggregator.h
Currently, whenever you create things, you need a number of imports for their log encoders. While the goal of being agnostic wrt libraries and encodings, the user experience of getting these loggers could be improved.
Ideas:
show
encoders importable directly from the companion object of the LogEncoders type, e.g. import LogEncoders.show._
circe
which require only one import for all possible encoders you might need. If this is currently not possible due to dependencies being split across modules (e.g. KPL and kinesis-client modules), perhaps there's a way around itProposing to replace the Slf4jLogger
instantiation with creating a logger through LoggerFactory
.
Currently, you can only make one from KinesisAsyncClient
, which is wrapped internally - it'd be nice to support the wrapper as a parameter too.
Environment: scala 2.13 application running on java 8
Exception on receive message from kinesis:
Exception in thread "io-compute-3" java.lang.NoSuchMethodError: java.nio.ByteBuffer.rewind()Ljava/nio/ByteBuffer;
at kinesis4cats.syntax.ByteBufferSyntax$ByteBufferOps$.asArray$extension(bytebuffer.scala:42)
at kinesis4cats.syntax.ByteBufferSyntax$ByteBufferOps$.asBase64String$extension(bytebuffer.scala:64)
at kinesis4cats.logging.instances.show$.$anonfun$byteBufferShow$1(show.scala:61)
at cats.Show$Ops.show(Show.scala:52)
at cats.Show$Ops.show$(Show.scala:52)
at cats.Show$ToShowOps$$anon$1.show(Show.scala:57)
at kinesis4cats.ShowBuilder.$anonfun$add$3(ShowBuilder.scala:55)
at scala.Option.fold(Option.scala:263)
at kinesis4cats.ShowBuilder.add(ShowBuilder.scala:55)
at kinesis4cats.kcl.RecordProcessor$LogEncoders$.$anonfun$show$8(RecordProcessor.scala:478)
at cats.instances.ListInstances.$anonfun$catsStdShowForList$2(list.scala:278)
at scala.collection.Iterator$$anon$9.next(Iterator.scala:584)
at scala.collection.IterableOnceOps.addString(IterableOnce.scala:1221)
at scala.collection.IterableOnceOps.addString$(IterableOnce.scala:1216)
at scala.collection.AbstractIterator.addString(Iterator.scala:1300)
at scala.collection.IterableOnceOps.mkString(IterableOnce.scala:1166)
at scala.collection.IterableOnceOps.mkString$(IterableOnce.scala:1164)
at scala.collection.AbstractIterator.mkString(Iterator.scala:1300)
at cats.instances.ListInstances.$anonfun$catsStdShowForList$1(list.scala:278)
at kinesis4cats.logging.instances.show$.$anonfun$showLogEncoder$1(show.scala:37)
at kinesis4cats.logging.LogEncoder$$anon$2.encode(LogEncoder.scala:62)
at kinesis4cats.logging.LogContext.$anonfun$addEncoded$2(LogContext.scala:54)
at scala.Option.fold(Option.scala:263)
at kinesis4cats.logging.LogContext.addEncoded(LogContext.scala:54)
at kinesis4cats.kcl.RecordProcessor.$anonfun$processRecords$3(RecordProcessor.scala:139)
at cats.effect.IOFiber.succeeded(IOFiber.scala:1170)
at cats.effect.IOFiber.runLoop(IOFiber.scala:240)
at cats.effect.IOFiber.execR(IOFiber.scala:1317)
at cats.effect.IOFiber.run(IOFiber.scala:112)
at cats.effect.unsafe.WorkerThread.run(WorkerThread.scala:485)
java.lang.NoSuchMethodError: java.nio.ByteBuffer.rewind()Ljava/nio/ByteBuffer;
at kinesis4cats.syntax.ByteBufferSyntax$ByteBufferOps$.asArray$extension(bytebuffer.scala:42)
at kinesis4cats.syntax.ByteBufferSyntax$ByteBufferOps$.asBase64String$extension(bytebuffer.scala:64)
at kinesis4cats.logging.instances.show$.$anonfun$byteBufferShow$1(show.scala:61)
at cats.Show$Ops.show(Show.scala:52)
at cats.Show$Ops.show$(Show.scala:52)
at cats.Show$ToShowOps$$anon$1.show(Show.scala:57)
at kinesis4cats.ShowBuilder.$anonfun$add$3(ShowBuilder.scala:55)
at scala.Option.fold(Option.scala:263)
Same code works well on JDK17
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.