Giter Site home page Giter Site logo

etspaceman / kinesis4cats Goto Github PK

View Code? Open in Web Editor NEW
24.0 24.0 7.0 1.96 MB

A comprehensive Scala library for Kinesis

Home Page: https://etspaceman.github.io/kinesis4cats/

License: Apache License 2.0

Scala 99.92% Dockerfile 0.02% Smithy 0.06%
cats cats-effect kinesis scala

kinesis4cats's People

Contributors

etspaceman avatar etspaceman-scala-steward-app[bot] avatar kubukoz avatar mishimishimishi avatar msosnicki avatar scala-steward avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

kinesis4cats's Issues

Docs for fs2 KCL should clarify call to `consumer.commitRecords`

First of all, thank you for a great library! I was able to use it to migrate an old KCL app still using v1 of the AWS KCL.

One hurdle I ran into, however, was that the docs for using the KCLConsumerFS2 show a call to consumer.commitRecords, so I added one in my code as well, thinking it was necessary. Only after testing and debugging did I find that the default config results in auto-committing. Ultimately this resulted in runtime errors when my manual commit occurred, because I was trying to commit a record that had already been committed by the auto-commit:

java.lang.IllegalArgumentException: Could not checkpoint at extended sequence number {SequenceNumber: XXXXX,SubsequenceNumber: 0} as it did not fall into acceptable range between the last checkpoint {SequenceNumber: YYYYY,SubsequenceNumber: 0} and the greatest extended sequence number passed to this record processor {SequenceNumber: YYYYY,SubsequenceNumber: 0}
	software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer.checkpoint(ShardRecordProcessorCheckpointer.java:124)
	kinesis4cats.kcl.CommittableRecord.$anonfun$checkpoint$1(CommittableRecord.scala:87)
	map @ kinesis4cats.compat.retry.package$.$anonfun$retryingOnSomeErrorsImpl$3(package.scala:103)
	map @ kinesis4cats.compat.retry.package$.$anonfun$retryingOnSomeErrorsImpl$3(package.scala:98)
	flatMap @ kinesis4cats.compat.retry.package$.$anonfun$retryingOnSomeErrorsImpl$2(package.scala:97)
	map2 @ fs2.Stream$.$anonfun$fromQueueUnterminated$1(Stream.scala:3540)
	map @ kinesis4cats.compat.retry.package$.applyPolicy(package.scala:116)
	flatMap @ kinesis4cats.compat.retry.package$.$anonfun$retryingOnSomeErrorsImpl$1(package.scala:96)

Do you think it would make sense to update the docs to clarify that calling consumer.commitRecords is not necessary unless you've explicitly changed the default RecordProcessor.Config behavior?

Suggestion: shading of smithy.rules package

Modules that generate code for smithy.rules and com.amazonaws.kinesis should probably shade them under other packages to avoid binary conflicts when used with something else that generates code for these namespaces - at least for smithy.rules, I feel like we shouldn't risk that people use another version and end up getting a binary incompatibility.

This really depends on the question: should the smithy4s-ness be exposed to users in ways other than the smithy4s dependency?

Client Consumers

The Client Consumers should rival the KCL, be offered for both smithy4s and AWS Java, and handle the following:

  • Lease management of consumer instances against configurable data stores
  • Both Polling and FanOut offerings of data consumption (FanOut not supported in Smithy4s yet due to lack of http2 support)
  • Checkpointing records that have completed processing

Provide a way to wait for a consumer to enter STARTED status

Currently, once you start a consumer (KCL), you don't really know when it's running - as a user, as far as I can tell, you have to manually attach a workerStateChangedListener. We should encapsulate this internally and offer a way to wait for that, e.g. for the purpose of testing.

The implementation idea is to create a SignallingRef holding the status, and update it in the listener via cats.effect.std.Dispatcher. Then have a process that observes changes to the SignallingRef and completes when it's STARTED. I have some code for this that could be easily adaptable to the library.

Bake retries into the Producer config

Currently, the kinesis-client Producer has two methods:

  • put
  • putWithRetries

The idea is to have just one put, and control retry-ness and retry parameters via the configuration of the producer. Additionally the default config should set some sane defaults, presumably "do retries".

Client Producers

Client Producers should compete with the KPL, and offered in both Smithy4s and AWS Java. Features that should be included:

  • Smart batching of requests against a cached shard map
  • Aggregation of records that share a partition key
  • Retryable put methods, which retry failed records in previous batches
  • Interface that batches / streams records in a background stream

Fix Flaky CI Tests

There are a few flaky CI tests that should be looked into as they occur

Using Smithy4s codegen-based modules results in code generation in downstreams

e.g. when you use k4c:smithy4s-client in an app that also generates code, the app will end up generating code for com.amazonaws.kinesis. This is not only redundant, but also these generated classes take priority over the k4c ones (they're probably first on the classpath), and they don't have all the necessary transformations applied.

I think this is connected to #101, and I have two ideas for fixing it:

  1. Remove the MANIFEST.MF entry specifying upstream libraries being used:
smithy4sDependencies: com.disneystreaming.alloy:alloy-core:0.1.18,softwa
 re.amazon.smithy:smithy-rules-engine:1.30.0,com.disneystreaming.smithy:
 aws-kinesis-spec:2023.02.10
  1. Keep the entry but add a smithy4s.tracking.smithy file listing the namespaces. This might be easier, as it's an addition over the status quo, meanwhile amending the manifest would require reverting a smithy4s-codegen-induced change. We also already have a snippet that does this:
Compile / smithy4sCodegen := {
  val (toRemove, toKeep) =
    (Compile / smithy4sCodegen).value.partition(f => f.ext == "smithy" && f.base != "smithy4s.tracking")

  val manifest = (Compile / smithy4sResourceDir).value / "META-INF" / "smithy" / "manifest"

  IO.delete(toRemove)
  IO.write(manifest, "smithy4s.tracking.smithy\n")

  toKeep
}

Producer: Aggregate by Predicted Shard

The KPL will attempt to leverage a shard-map and aggregate records that share a predicted shard. The logic for this is as follows:

  • If the shard map is not available for whatever reason (e.g. an error occurred), no aggregation is performed
  • If the shard map is stale, the KPL will produce a batch and detect if the predicted shard differed from the resulting shard. If it detects a difference, the shard map is refreshed and the records are resent.

The KCL will silently discard data in aggregated records if they do not have a calculated hash-key within the received shard's range. So re-sending the records, aggregated against the proper shard-id, is vital to receiving all records. This could result in duplicates though, as some records of the original request may have been successfully produced.

An example: consider a scale up operation, where new shards are created, and hash ranges for the new shards are smaller than the previous shard map. It is possible that some of the records of the original try could have successfully been consumed by the KCL, as the calculated hash key could have fallen into that smaller range. Others may not have. So this would mean a retry of the request would result in duplicates being consumed by the KCL.

This also seems like quite a bit of overhead to save a few requests. So the default today in kinesis4cats's Producer differs from the KPL, in that it only aggregates records that share a partition key. This means that the shard map can be stale or empty, and aggregation can still occur safely.

kinesis4cats should also offer the capabilities that the KPL offers, as an opt-in feature. This issue is for that feature implementation.

References:

https://github.com/awslabs/amazon-kinesis-producer/pull/277/files#diff-d1cbd3793299fd3c06ee3b0232d378f6341e9b5eb64eaa9b7b2094a9c911431fR211

https://github.com/awslabs/amazon-kinesis-producer/blob/master/aws/kinesis/core/aggregator.h

https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/AggregatorUtil.java#L145-L150

Improve LogEncoders UX

Currently, whenever you create things, you need a number of imports for their log encoders. While the goal of being agnostic wrt libraries and encodings, the user experience of getting these loggers could be improved.

Ideas:

  • make show encoders importable directly from the companion object of the LogEncoders type, e.g. import LogEncoders.show._
  • have uber-imports for e.g. circe which require only one import for all possible encoders you might need. If this is currently not possible due to dependencies being split across modules (e.g. KPL and kinesis-client modules), perhaps there's a way around it
  • maybe make encoders explicit, and default to "show" everywhere

Add support of Java 8

Environment: scala 2.13 application running on java 8

Exception on receive message from kinesis:

Exception in thread "io-compute-3" java.lang.NoSuchMethodError: java.nio.ByteBuffer.rewind()Ljava/nio/ByteBuffer;
	at kinesis4cats.syntax.ByteBufferSyntax$ByteBufferOps$.asArray$extension(bytebuffer.scala:42)
	at kinesis4cats.syntax.ByteBufferSyntax$ByteBufferOps$.asBase64String$extension(bytebuffer.scala:64)
	at kinesis4cats.logging.instances.show$.$anonfun$byteBufferShow$1(show.scala:61)
	at cats.Show$Ops.show(Show.scala:52)
	at cats.Show$Ops.show$(Show.scala:52)
	at cats.Show$ToShowOps$$anon$1.show(Show.scala:57)
	at kinesis4cats.ShowBuilder.$anonfun$add$3(ShowBuilder.scala:55)
	at scala.Option.fold(Option.scala:263)
	at kinesis4cats.ShowBuilder.add(ShowBuilder.scala:55)
	at kinesis4cats.kcl.RecordProcessor$LogEncoders$.$anonfun$show$8(RecordProcessor.scala:478)
	at cats.instances.ListInstances.$anonfun$catsStdShowForList$2(list.scala:278)
	at scala.collection.Iterator$$anon$9.next(Iterator.scala:584)
	at scala.collection.IterableOnceOps.addString(IterableOnce.scala:1221)
	at scala.collection.IterableOnceOps.addString$(IterableOnce.scala:1216)
	at scala.collection.AbstractIterator.addString(Iterator.scala:1300)
	at scala.collection.IterableOnceOps.mkString(IterableOnce.scala:1166)
	at scala.collection.IterableOnceOps.mkString$(IterableOnce.scala:1164)
	at scala.collection.AbstractIterator.mkString(Iterator.scala:1300)
	at cats.instances.ListInstances.$anonfun$catsStdShowForList$1(list.scala:278)
	at kinesis4cats.logging.instances.show$.$anonfun$showLogEncoder$1(show.scala:37)
	at kinesis4cats.logging.LogEncoder$$anon$2.encode(LogEncoder.scala:62)
	at kinesis4cats.logging.LogContext.$anonfun$addEncoded$2(LogContext.scala:54)
	at scala.Option.fold(Option.scala:263)
	at kinesis4cats.logging.LogContext.addEncoded(LogContext.scala:54)
	at kinesis4cats.kcl.RecordProcessor.$anonfun$processRecords$3(RecordProcessor.scala:139)
	at cats.effect.IOFiber.succeeded(IOFiber.scala:1170)
	at cats.effect.IOFiber.runLoop(IOFiber.scala:240)
	at cats.effect.IOFiber.execR(IOFiber.scala:1317)
	at cats.effect.IOFiber.run(IOFiber.scala:112)
	at cats.effect.unsafe.WorkerThread.run(WorkerThread.scala:485)
java.lang.NoSuchMethodError: java.nio.ByteBuffer.rewind()Ljava/nio/ByteBuffer;
	at kinesis4cats.syntax.ByteBufferSyntax$ByteBufferOps$.asArray$extension(bytebuffer.scala:42)
	at kinesis4cats.syntax.ByteBufferSyntax$ByteBufferOps$.asBase64String$extension(bytebuffer.scala:64)
	at kinesis4cats.logging.instances.show$.$anonfun$byteBufferShow$1(show.scala:61)
	at cats.Show$Ops.show(Show.scala:52)
	at cats.Show$Ops.show$(Show.scala:52)
	at cats.Show$ToShowOps$$anon$1.show(Show.scala:57)
	at kinesis4cats.ShowBuilder.$anonfun$add$3(ShowBuilder.scala:55)
	at scala.Option.fold(Option.scala:263)

Same code works well on JDK17

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.