Giter Site home page Giter Site logo

Kinesis Client Library v2 about kinesalite HOT 17 OPEN

mhart avatar mhart commented on June 13, 2024 10
Kinesis Client Library v2

from kinesalite.

Comments (17)

Harshank avatar Harshank commented on June 13, 2024 13

Any updates on this issue? Thanks!

from kinesalite.

oripwk avatar oripwk commented on June 13, 2024 12

I confirm AWS v2 is not working. I had and solved the following issues:

  1. io.netty.handler.codec.http2.Http2Exception: First received frame was not SETTINGS. Hex dump for first 5 bytes: 3c68656164
    Was solved by configuring Netty to use HTTP 1.1
  2. software.amazon.awssdk.services.kinesis.model.KinesisException: null (Service: Kinesis, Status Code: 502, Request ID: null)
    Was solved by setting env var CBOR_ENABLED to false

Now it's stuck on getting HTTP 400 error.

So AWS Java SDK v2 is not supported

from kinesalite.

prascuna avatar prascuna commented on June 13, 2024 4

I managed to make some progress on this, but I eventually gave up.
Kinesalite doesn't seem to support HTTP2, so I injected an HTTP1_1 client into KinesisAsyncClient.
Also I had to disable CBOR via system property and I had to import kinesalite's certificate in my JKS like that:

sudo keytool -import -alias kinesalite -keystore "/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre/lib/security/cacerts" -file server-crt.pem
  System.setProperty("aws.cborEnabled", "false")
  val kinesisClient = KinesisAsyncClient
    .builder()
    .region(Region.US_EAST_1)
    .httpClient(NettyNioAsyncHttpClient.builder().protocol(Protocol.HTTP1_1).build())
    .endpointOverride(new URI("https://kinesalite:4567"))
    .build()

At this point the connection could be established, but kinesalite doesn't implement a bunch of other features ( I just stopped at DescribeStreamSummary and DescribeStreamConsumer)

from kinesalite.

yonekawa avatar yonekawa commented on June 13, 2024 1

KCL v2 use fan-out strategy as default. the strategy use RegisterStreamConsumer and SubscribeToShard that is not supported by kinesalite.
https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java#L91

I guess we can change strategy to polling by set RetrievalConfig to Scheduler to be use GetRecords API.

from kinesalite.

etspaceman avatar etspaceman commented on June 13, 2024 1

You can get the V2 library to work w/ localstack (kinesalite), see comment here:

localstack/localstack#893 (comment)

from kinesalite.

arjantop avatar arjantop commented on June 13, 2024

@hoangtrucit Are you trying to use the new SubscribeToShard api?

from kinesalite.

prascuna avatar prascuna commented on June 13, 2024

Similar issue here, using KCL 2.0.1

[aws-java-sdk-NettyEventLoop-2-3] ERROR software.amazon.awssdk.http.nio.netty.internal.RunnableRequest - Failed to create connection to http://localhost:4567/
java.io.IOException: The channel was closed before the protocol could be determined.
	at software.amazon.awssdk.http.nio.netty.internal.ChannelPipelineInitializer$1.channelUnregistered(ChannelPipelineInitializer.java:99)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:181)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:167)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:160)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:181)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:167)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:160)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelUnregistered(DefaultChannelPipeline.java:1412)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:181)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:167)
	at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:865)
	at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:830)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:464)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
	at java.lang.Thread.run(Thread.java:745)

from kinesalite.

MariaLapovska avatar MariaLapovska commented on June 13, 2024

@prascuna does that mean that kinesalite doesn't fully support the KCL v2 at the moment? I followed your advice and it helped to set the connection with kinesis, but I'm stiil getting the following exception, during DescribeStreamSummary request:

2018-12-17 17:49:42:600 36344 [main] ERROR s.a.kinesis.coordinator.Scheduler - Worker.run caught exception, sleeping for 1000 milli seconds!
software.amazon.awssdk.services.kinesis.model.KinesisException: null (Service: Kinesis, Status Code: 400, Request ID: 5e94b930-0213-11e9-82e8-6981636a0842)
	at software.amazon.awssdk.services.kinesis.model.KinesisException$BuilderImpl.build(KinesisException.java:95)
	at software.amazon.awssdk.services.kinesis.model.KinesisException$BuilderImpl.build(KinesisException.java:56)
	at software.amazon.awssdk.core.internal.http.response.SdkErrorResponseHandler.handle(SdkErrorResponseHandler.java:46)
	at software.amazon.awssdk.core.internal.http.response.SdkErrorResponseHandler.handle(SdkErrorResponseHandler.java:30)
	at software.amazon.awssdk.core.internal.http.async.SyncResponseHandlerAdapter.complete(SyncResponseHandlerAdapter.java:92)
	at software.amazon.awssdk.core.client.handler.BaseAsyncClientHandler$InterceptorCallingHttpResponseHandler.complete(BaseAsyncClientHandler.java:225)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.handleResponse(MakeAsyncHttpRequestStage.java:185)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.complete(MakeAsyncHttpRequestStage.java:171)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.complete(MakeAsyncHttpRequestStage.java:122)
	at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onComplete(ResponseHandler.java:262)
	at com.typesafe.netty.HandlerPublisher.complete(HandlerPublisher.java:408)
	at com.typesafe.netty.HandlerPublisher.handlerRemoved(HandlerPublisher.java:395)
	at io.netty.channel.DefaultChannelPipeline.callHandlerRemoved0(DefaultChannelPipeline.java:670)
	at io.netty.channel.DefaultChannelPipeline.remove(DefaultChannelPipeline.java:505)
	at io.netty.channel.DefaultChannelPipeline.remove(DefaultChannelPipeline.java:451)
	at com.typesafe.netty.http.HttpStreamsHandler.removeHandlerIfActive(HttpStreamsHandler.java:328)
	at com.typesafe.netty.http.HttpStreamsHandler.handleReadHttpContent(HttpStreamsHandler.java:189)
	at com.typesafe.netty.http.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:165)
	at com.typesafe.netty.http.HttpStreamsClientHandler.channelRead(HttpStreamsClientHandler.java:148)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:646)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:581)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:460)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
	at java.lang.Thread.run(Thread.java:748)

from kinesalite.

etspaceman avatar etspaceman commented on June 13, 2024

@mhart What's the status of this? We'd really like to use kinesalite for the 2.x consumer as the 1.x consumer hasn't had a release for nearly a year.

from kinesalite.

mhart avatar mhart commented on June 13, 2024

@etspaceman the status is that it's still open. I know nothing about KCL v2 – it seems like there's a lot of work involved in supporting this, but perhaps the localstack folks know more?

from kinesalite.

israelcolomer avatar israelcolomer commented on June 13, 2024

We really need this feature too! And I presume more will follow as using the KCL v2 becomes more standardised. Looks like on localstack they're waiting for kinesalite to address it (referencing this current issue) on localstack/localstack#893.

from kinesalite.

rwinograd avatar rwinograd commented on June 13, 2024

@mhart regarding your question about complexity, LocalStack has patched their layer that sits before Kinesalite to add API implementations for the missing APIs (RegisterStreamConsumer, DeregisterStreamConsumer, ListStreamConsumers, and DescribeStreamConsumer).

They back the register/deregister/list with an in-memory data structure and avoid any complexity from the NextToken API. That seems to have been sufficient to fix the KCL v2 integration issues.

from kinesalite.

slnowak avatar slnowak commented on June 13, 2024

@rwinograd isn't SubscribeToShard missing though? I'm using latest version of localstack and I'm unable to make it working with KCLv2 (well - it is possible, but you have to set RetrievalSpecificConfig to PollingConfig in KCL settings)

from kinesalite.

Anja05 avatar Anja05 commented on June 13, 2024

I confirm AWS v2 is not working. I had and solved the following issues:

  1. io.netty.handler.codec.http2.Http2Exception: First received frame was not SETTINGS. Hex dump for first 5 bytes: 3c68656164
    Was solved by configuring Netty to use HTTP 1.1
  2. software.amazon.awssdk.services.kinesis.model.KinesisException: null (Service: Kinesis, Status Code: 502, Request ID: null)
    Was solved by setting env var CBOR_ENABLED to false

Now it's stuck on getting HTTP 400 error.

So AWS Java SDK v2 is not supported

Hi @oripwk did you solve this problem. I m also facing the same issue.Stuck on getting 400 error. Please suggest if you resolve this problem

from kinesalite.

prateeksinghal10 avatar prateeksinghal10 commented on June 13, 2024

@oripwk @Anja05 any work arounds for http 400 error?

from kinesalite.

Anja05 avatar Anja05 commented on June 13, 2024

@oripwk @Anja05 any work arounds for http 400 error?
@prateeksinghal10
Hope this is resolved!! If not please check your polling configuration. Please set up your polling configuration if it is not set. In localstack you have to explicitly specify to poll the stream
//Sample
final PollingConfig pollingConfig =
new PollingConfig(awsKinesisConfiguration.getStreamName(), kinesisAsyncClient);
retrievalConfig.retrievalSpecificConfig(pollingConfig);

from kinesalite.

upanshu21 avatar upanshu21 commented on June 13, 2024

@oripwk @Anja05 any work arounds for http 400 error?
@prateeksinghal10
Hope this is resolved!! If not please check your polling configuration. Please set up your polling configuration if it is not set. In localstack you have to explicitly specify to poll the stream
//Sample
final PollingConfig pollingConfig =
new PollingConfig(awsKinesisConfiguration.getStreamName(), kinesisAsyncClient);
retrievalConfig.retrievalSpecificConfig(pollingConfig);

@Anja05 where do we specify the polling config?

from kinesalite.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.