etspaceman / kinesis-mock Goto Github PK
View Code? Open in Web Editor NEWA Kinesis Mock using Http4s
License: MIT License
A Kinesis Mock using Http4s
License: MIT License
The HTTP Service is a stub right now and needs to actually be compliant with the API offered by Kinesis.
Kinesalite does most of this in their top level index.js file:
https://github.com/mhart/kinesalite/blob/master/index.js
Things that we should be considering
If EnforceConsumerDeletion=true
is passed on DeleteStream
, the system should delete any existing stream consumer entities.
Instead, it seems that kinesis-mock raises an exception if EnforceConsumerDeletion
is set and consumers are present:
@etspaceman Please let me know if we can help out with this change. Thanks!
It might be possible to build kinesis-mock using scalajs instead of graal. There are some pros and cons to this. This would allow kinesis-mock executables to have easier cross-platform integration, as you would not need to cross-compile it for every single platform. A TLS server should also perform better in NodeJS rather than Graal. However, this would also likely lead to a larger executable and docker-image. And performance would be something to keep an eye on.
To do this, certain pieces would need to be rewritten so that all included dependencies are scalajs compliant. For example, the AWS SDK utilities are brought in for MD5 hashing. We would have to use a separate library or write our own hashing routine for this to work properly.
Kinesis-Mock is distributed as a Docker image today. It would be cool to be able to build graalvm native images for all platforms (windows, mac, linux) and provide distributions for each.
Describe the bug
Quotes used in KINESIS_INITIALIZE_STREAMS will cause a parsing error. This is new behavior which perhaps was triggered by one of the underlying component upgrades.
To Reproduce
Steps to reproduce the behavior:
Expected behavior
The surrounding quote characters in environment variables should be ignored.
Additional context
This worked with a 3 month old localstack image, and only broke recently, potentially in conjunction with the requirement for the region to be included in the stream name (that was not required in the previous image either)
Some of our users in LocalStack have been facing Kinesis startup issues - especially on MacBook Pro M1 laptops, caused by this error during startup:
qemu: uncaught target signal 11 (Segmentation fault) - core dumped
Not entirely sure what would be the best way to debug this, as I currently don't have access to a M1 machine. One option would be to catch this error on LocalStack side, and then fall back to the legacy Kinesis provider (kinesalite), but ideally it would be great to see the root cause of this issue fixed properly. Otherwise, we run the risk of diverging too much with the different implementations we're using in LocalStack.
Please let us know if there's anything we can help out with. Thanks for your help!
Describe the bug
LocalStack, which uses, in its Docker container, kinesis-mock
is reporting to my Elixir ex_aws
-based application that "Authorization header requires \Signature\ parameter. Authorization header requires \SignedHeaders\ parameter." (linked Localstack issue)
It seems parsing expects a space between ,
and the next parameter when this is not required by AWS.
To Reproduce
Steps to reproduce the behavior:
java -jar ./kinesis-mock.jar
(exposed 4568 for plain HTTP requests)curl -v -X POST -H "Authorization: AWS4-HMAC-SHA256 Credential=test/20221219/us-east-1/kinesis/aws4_request,SignedHeaders=content-type;host;x-amz-date;x-amz-target,Signature=ba9705e002b4f224d08c908606f0076a7f4f53bcc68315897764c587c7e5ffb5" -H "content-type: application/x-amz-json-1.1" -H "host: localhost:4566" -H "x-amz-date: 20221219T221436Z" -H "x-amz-target: Kinesis_20131202.ListStreams" http://127.0.0.1:4500 -d '{}'
Expected behavior
No error. (we could probably split on ,
and trim the whitespace.
Describe the bug
When trying to receive published, base64 encoded data using the AWS CLI v2, kinesis mock reports an Illegal base64 character 22
To Reproduce
Steps to reproduce the behavior:
docker run --rm -p 4566:4566 --name kinesis-mock -e KINESIS_MOCK_PLAIN_PORT=4566 ghcr.io/etspaceman/kinesis-mock:0.4.4
#!/bin/bash
set -euxo pipefail
AWS=${AWS:-awslocal}
KINESIS_STREAM_NAME=test-stream
${AWS} kinesis create-stream --stream-name $KINESIS_STREAM_NAME --shard-count 1 --stream-mode-details StreamMode=PROVISIONED
base64_content=$(echo '{"test": "data"}' | base64)
${AWS} kinesis put-record \
--stream-name $KINESIS_STREAM_NAME \
--data ${base64_content} --partition-key id
SHARD_ITERATOR=$(${AWS} kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name $KINESIS_STREAM_NAME --query 'ShardIterator');
${AWS} kinesis get-records --shard-iterator $SHARD_ITERATOR
Using aws cli v2:
aws-cli/2.8.3 Python/3.9.11 Linux/6.6.6-arch1-1 exe/x86_64.arch prompt/off
The CLI documentation for the aws cli v2 specifies (https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesis/put-record.html):
The formatting style to be used for binary blobs. The default format is base64. The base64 format expects binary blobs to be provided as a base64 encoded string.
I confirmed that the payload sent to kinesis is indeed base64 encoded once.
Also, I validated the case against AWS.
Expected behavior
Records get returned properly.
Error logs
[error] kinesis.mock.KinesisMockService$ 2023-12-12T16:56:24.557165Z contextId=fa937ab2-8e90-46c1-a35a-081f9b30f185 Error servicing request: POST / from 172.17.0.1
java.lang.IllegalArgumentException: Illegal base64 character 22
at oy.loa(/main.js:506:31)
at mra(/main.js:507:249)
at hqb(/main.js:3470:21)
at {anonymous}()(/main.js:3320:72)
at w.h(/main.js:2942:262)
at z7(/main.js:4433:246)
at e.Nk(/main.js:4461:328)
at ZN.Nk(/main.js:1376:197)
at {anonymous}()(/main.js:3605:53)
at u.ra(/main.js:2942:45)
Additional context
This first occurred as a bug within LocalStack, but we verified it against kinesis-mock
with the given instructions directly.
Happy to help with anything, however I would appreciate pointing me in the right direction, my scala is rather bad.
EDIT: aws-mock
=> kinesis-mock
. Somehow messed this up :)
Needs a CI Pipeline. Ideally is like my scalacheck-faker repo setup:
https://github.com/etspaceman/scalacheck-faker/tree/master/.github/workflows
Must support all of the following:
Hello!
Describe the bug
When a shad is split, HashKeyRanges of the newly created shards have swapped ending and starting hash keys and new items cannot be put on the stream. See
https://github.com/etspaceman/kinesis-mock/blob/main/src/main/scala/kinesis/mock/api/SplitShardRequest.scala#L78-L81
https://github.com/etspaceman/kinesis-mock/blob/main/src/main/scala/kinesis/mock/models/HashKeyRange.scala#L8
I'd be happy to open a PR with a fix.
Using PERSIST_PATH with empty persistence folder seems to break startup.
Example from LocalStack:
DATA_DIR=/tmp/datadir1 localstack start
...
[ioapp-compute-1] INFO 2021-06-21 23:02:49 k.m.KinesisMockService contextId=091765b7-d2d4-11eb-ae88-35189dcfe17a, cacheConfig={"awsAccountId":"000000000000","awsRegion":"eu-central-1","createStreamDuration":{"length":500,"unit":"MILLISECONDS"},"deleteStreamDuration":{"length":500,"unit":"MILLISECONDS"},"deregisterStreamConsumerDuration":{"length":500,"unit":"MILLISECONDS"},"initializeStreams":null,"mergeShardsDuration":{"length":500,"unit":"MILLISECONDS"},"persistConfig":{"fileName":"kinesis-data.json","interval":{"length":5,"unit":"SECONDS"},"loadIfExists":true,"path":"/tmp/ls-data1/kinesis","shouldPersist":true},"registerStreamConsumerDuration":{"length":500,"unit":"MILLISECONDS"},"shardLimit":100,"splitShardDuration":{"length":500,"unit":"MILLISECONDS"},"startStreamEncryptionDuration":{"length":500,"unit":"MILLISECONDS"},"stopStreamEncryptionDuration":{"length":500,"unit":"MILLISECONDS"},"updateShardCountDuration":{"length":500,"unit":"MILLISECONDS"}} - Logging Cache Config
os.PathError$InvalidSegment: [] is not a valid path segment. OS-Lib does not allow empty path segments If you are dealing with path-strings coming from external sources, use the Path(...) or RelPath(...) constructor calls to convert them.
at os.BasePath$.fail$1(Path.scala:120)
at os.BasePath$.checkSegment(Path.scala:137)
at os.PathChunk$StringPathChunk.<init>(Path.scala:21)
at os.PathChunk$.StringPathChunk(Path.scala:20)
at kinesis.mock.cache.PersistConfig.createPath(PersistConfig.scala:25)
at kinesis.mock.cache.PersistConfig.osPath(PersistConfig.scala:34)
at kinesis.mock.cache.Cache$.$anonfun$loadFromFile$1(Cache.scala:1245)
at ifM$extension @ org.typelevel.log4cats.slf4j.internal.Slf4jLoggerInternal$.org$typelevel$log4cats$slf4j$internal$Slf4jLoggerInternal$$contextLog(Slf4jLoggerInternal.scala:55)
at ifM$extension @ org.typelevel.log4cats.slf4j.internal.Slf4jLoggerInternal$.org$typelevel$log4cats$slf4j$internal$Slf4jLoggerInternal$$contextLog(Slf4jLoggerInternal.scala:55)
at flatMap @ kinesis.mock.KinesisMockService$.$anonfun$run$6(KinesisMockService.scala:34)
at ifM$extension @ org.typelevel.log4cats.slf4j.internal.Slf4jLoggerInternal$.org$typelevel$log4cats$slf4j$internal$Slf4jLoggerInternal$$contextLog(Slf4jLoggerInternal.scala:55)
at flatMap @ kinesis.mock.KinesisMockService$.$anonfun$run$4(KinesisMockService.scala:29)
at run @ com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:519)
at run @ com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:519)
at delay$extension @ pureconfig.module.catseffect.package$.loadF(package.scala:51)
at subflatMap @ pureconfig.module.catseffect.package$.loadF(package.scala:52)
at leftMap @ pureconfig.module.catseffect.package$.loadF(package.scala:53)
at rethrowT @ pureconfig.module.catseffect.package$.loadF(package.scala:54)
at flatMap @ kinesis.mock.KinesisMockService$.$anonfun$run$3(KinesisMockService.scala:27)
at flatMap @ kinesis.mock.KinesisMockService$.$anonfun$run$1(KinesisMockService.scala:26)
at run @ com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:519)
at use @ kinesis.mock.KinesisMockService$.run(KinesisMockService.scala:24)
at run @ com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:519)
@etspaceman Any ideas? Please let me know if you need any more details. Thanks!
Kinesis-Mock supports namespacing over regions which was introduced in #187
It would be very useful to build on that and allow namespacing over AWS accounts. This can be based on the Access Key ID retrieved from the Authorization header.
Some possible approaches:
AWS_ACCOUNT_ID
for all outputs (this approach is taken by DynamoDB Local)The error responses are prefixed with kinesis.mock.
which makes them incompatible with expected error codes in at least some AWS SDKs. This means we can't replace kinesalite in our tests without code modifications, which shouldn't be necessary here.
eg, we have the following golang code (following the suggested error handling guide):
_, err := k.DescribeStream(&kinesis.DescribeStreamInput{
StreamName: aws.String(streamName),
})
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
switch awsErr.Code() {
case kinesis.ErrCodeResourceNotFoundException:
// create stream
default:
log.Printf("Unknown error: "+awsErr.Code())
}
}
}
which is outputting:
Unknown error: kinesis.mock.ResourceNotFoundException
This seems to be an issue here -- not done Java for a while, but I believe just replacing with getSimpleName
would fix.
Thanks.
I am trying to start the service as per the steps mentioned in the Readme guide. But the container is not starting and I am seeing the below exception on the console:
qemu: uncaught target signal 11 (Segmentation fault) - core dumped
I am using Apple M1 Pro.
It is mentioned that the file kinesis-mock-macos-amd64-dynamic will help but how can I use this?
Can someone help me how can I start the container successfully?
This project is a good idea but its of limited utility due to being written in Scala instead of a real language. It should be switched to one of the following
Describe the bug
With the latest releases, the PERSIST_PATH
is always used as being relative to the current working directory (even though it starts with a /
).
To Reproduce
#!/bin/bash
# start with absolute path
SHOULD_PERSIST_DATA=true PERSIST_PATH="/tmp/state/kinesis" PERSIST_FILE_NAME=state.json PERSIST_INTERVAL=5s LOG_LEVEL=DEBUG node main.js &
# save the PID
KINESIS_PID=$!
# create some data
aws --endpoint-url="http://localhost:4568/" kinesis create-stream --stream-name test
# make sure everything is persisted
sleep 10
# send the interrupt
kill -2 $KINESIS_PID
# the absolute directory does not exist
ls /tmp/state/kinesis
# the relative one does
ls tmp/state/kinesis
Expected behavior
When a PERSIST_PATH
with a leading slash is set, the path should be used as an absolute path (and not relative to the current working directory).
Additional context
I was facing this issue when testing our persistence feature for LocalStack, based on localstack/localstack#8790.
A workaround was implemented for localstack/localstack#8790 with localstack/localstack@e5944c2
The blaze server for the 4567 port is configured to use Http2 via Blaze but that seems to be bugged:
Chris Davenport has mentioned that the Ember backend will support Http2 soonish so we can wait until this backend is released for that support.
I have a unit test that checks that the shard IDs as returned by PutRecords are as predicted. Since localstack 0.12.13 this test is failing. The correct shard IDs are in the response, just in a different order. Is it possible that the order of the records in PutRecordsResponse is incorrect w.r.t. the requests?
Describe the bug
When Kinesis is under high load with large records (e.g. 100 of records with ~256kB each) it slows down and eventually fail on out of memory.
To Reproduce
Steps to reproduce the behavior:
docker container run \
-d \
--name localstack \
--rm \
-p 4566-4583:4566-4583 \
-p 4510-4559:4510-4559 \
-v /var/run/docker.sock:/var/run/docker.sock \
-v "${LOCALSTACK_VOLUME_DIR:-./volume}:/var/lib/localstack" \
-e SERVICES=s3,kinesis,iam,dynamodb,dynamodbstreams,lambda \
--platform linux/amd64 \
localstack/localstack
aws kinesis create-stream \
--stream-name captures-stream \
--shard-count 1 \
--endpoint-url http://localhost:4566 \
--region sa-east-1
aws kinesis put-record \
--stream-name captures-stream \
--partition-key 1 \
--data "Hi, how are you?" \
--cli-binary-format raw-in-base64-out \
--endpoint-url http://localhost:4566 \
--region sa-east-1
Expected behavior
I expect to continue sending put-record data with no freeze
Describe the bug
The CreateStreamRequest
defines the shardCount
as mandatory / not optional.
This results in serialization errors when invoking create-stream
if the shard-count
is not set.
A request against AWS without the shardCount
works:
aws kinesis create-stream --stream-name test
It's also marked as optional in the AWS CLI docs on create-stream
: https://docs.aws.amazon.com/cli/latest/reference/kinesis/create-stream.html
To Reproduce
$ docker pull ghcr.io/etspaceman/kinesis-mock:0.2.4
$ docker run -d -p 4568:4568 ghcr.io/etspaceman/kinesis-mock:0.2.4
$ aws --endpoint-url=http://localhost:4568 kinesis create-stream --stream-name test
An error occurred (SerializationException) when calling the CreateStream operation: Invalid message body: Could not decode JSON: {
"StreamName" : "test"
}
Expected behavior
The shardCount
should be optional, i.e. the following request should successfully create a stream:
aws --endpoint-url=http://localhost:4568 kinesis create-stream --stream-name test
Additional context
Thanks for the great tool! Please let me know if any more info would be helpful.
I could also try to file a PR if the issue is confirmed. ๐
Hi there,
First of all, great piece of kit. We have used kinesis-mock
along with localstack
and test-containers
extensively with great effect.
Now, to the issue; I am noticing when submitting PutRecord
requests in parallel that (even though all receive a HTTP 200 response) some are not making it out the other side on subsequent GetRecord
requests.
Things I've tried / noticed / investigated:
PutRecord
requests in serial, everything works as expectedlocalstack
logging and investigated the in/out logging. I can see all PutRecord
requests returning HTTP 200.numRecords
the more likely records will go missing. A value of 10 will fail 50% of the time, a value of 100 with fail 100% of the time.I've written a little unit test that demonstrates the same symptoms (snippet at bottom of post) with a 2-sharded stream.
If I can provide anymore information (trace logs from testcontainers
etc), please ask. Thanks!
@Test
public void testKinesis() throws Exception {
int numRecords = 10;
// Submit 10 PutRecord requests into a pool of 2 worker threads.
CountDownLatch latch = new CountDownLatch(numRecords);
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, numRecords).forEach(i -> executor.submit( () -> {
String data = i + "";
PutRecordResult putRecordResult = kinesis.putRecord(
new PutRecordRequest()
.withData(wrap(data.getBytes()))
.withPartitionKey(data)
.withStreamName("test_stream"));
latch.countDown();
if (putRecordResult.getSdkHttpMetadata().getHttpStatusCode() != 200) {
log.info("Put of " + i + " FAILED. Response Code: " + putRecordResult.getSdkHttpMetadata().getHttpStatusCode());
}
}));
// Wait for all the work to be completed
latch.await();
// Get a couple of iterators
List<String> shardIds = kinesis.describeStream("test_stream").getStreamDescription().getShards()
.stream()
.map(Shard::getShardId)
.collect(toList());
List<String> shardIterators = shardIds
.stream()
.map(shardId -> kinesis.getShardIterator(new GetShardIteratorRequest()
.withShardIteratorType(TRIM_HORIZON)
.withShardId(shardId)
.withStreamName("test_stream")).getShardIterator())
.collect(toList());
// GetRecords
List<String> requests = Collections.synchronizedList(new ArrayList<>());
shardIterators.forEach(shardIterator -> {
GetRecordsResult getRecordsResult;
do {
getRecordsResult = kinesis.getRecords(new GetRecordsRequest()
.withLimit(100)
.withShardIterator(shardIterator));
if (getRecordsResult.getRecords().size() > 0) {
requests.addAll(getRecordsResult.getRecords()
.stream()
.flatMap(record -> Stream.of(new String(record.getData().array())))
.collect(toList()));
shardIterator = getRecordsResult.getNextShardIterator();
}
} while (getRecordsResult.getRecords().size() > 0);
});
log.info("Collected (" + requests.size() + ") " + requests);
assertThat(requests.size()).isEqualTo(numRecords);
}
// Associated Spring setup code. There is a couple of other setup
// bits missing from this snippet (call to create stream etc) for breivity.
@Bean(initMethod = "start", destroyMethod = "stop")
public GenericContainer localstack() {
return new GenericContainer("localstack/localstack:latest")
.withEnv("DEFAULT_REGION", "ap-southeast-2")
.withEnv("SERVICES", "dynamodb,kinesis")
.withEnv("START_WEB", "0")
.withEnv("LS_LOG", "DEBUG")
.waitingFor(forLogMessage(".*Ready.*", 1));
// Stream is created with 2 shards
}
Version: 0.1.2
Steps to reproduce:
SHOULD_PERSIST_DATA=true PERSIST_PATH="/tmp/kinesis-mock" ./kinesis-mock-linux-amd64-static
I would expect this to create the necessary kinesis-data.json
in the directory, like it does when PERSIST_PATH
is not specified. however it fails with:
thomas@om ~/Downloads % SHOULD_PERSIST_DATA=true PERSIST_PATH="/tmp/kinesis-mock" ./kinesis-mock-linux-amd64-static
[ioapp-compute-1] INFO 2021-06-28 20:31:26,479 k.m.KinesisMockService contextId=0c3cea9f-d83f-11eb-a703-395ba624dc2a, cacheConfig={"awsAccountId":"000000000000","awsRegion":"us-east-1","createStreamDuration":{"length":500,"unit":"MILLISECONDS"},"deleteStreamDuration":{"length":500,"unit":"MILLISECONDS"},"deregisterStreamConsumerDuration":{"length":500,"unit":"MILLISECONDS"},"initializeStreams":null,"mergeShardsDuration":{"length":500,"unit":"MILLISECONDS"},"persistConfig":{"fileName":"kinesis-data.json","interval":{"length":5,"unit":"SECONDS"},"loadIfExists":true,"path":"/tmp/kinesis-mock","shouldPersist":true},"registerStreamConsumerDuration":{"length":500,"unit":"MILLISECONDS"},"shardLimit":50,"splitShardDuration":{"length":500,"unit":"MILLISECONDS"},"startStreamEncryptionDuration":{"length":500,"unit":"MILLISECONDS"},"stopStreamEncryptionDuration":{"length":500,"unit":"MILLISECONDS"},"updateShardCountDuration":{"length":500,"unit":"MILLISECONDS"}} - Logging Cache Config
java.io.FileNotFoundException: /tmp/kinesis-mock/kinesis-data.json (No such file or directory)
at com.oracle.svm.jni.JNIJavaCallWrappers.jniInvoke_VA_LIST:Ljava_io_FileNotFoundException_2_0002e_0003cinit_0003e_00028Ljava_lang_String_2Ljava_lang_String_2_00029V(JNIJavaCallWrappers.java:0)
at java.io.FileInputStream.open0(FileInputStream.java)
at java.io.FileInputStream.open(FileInputStream.java:219)
at java.io.FileInputStream.<init>(FileInputStream.java:157)
at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:1029)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3076)
at kinesis.mock.cache.Cache$.$anonfun$loadFromFile$3(Cache.scala:1247)
at flatMap @ kinesis.mock.cache.Cache$.$anonfun$loadFromFile$2(Cache.scala:1247)
at ifM$extension @ org.typelevel.log4cats.slf4j.internal.Slf4jLoggerInternal$.org$typelevel$log4cats$slf4j$internal$Slf4jLoggerInternal$$contextLog(Slf4jLoggerInternal.scala:55)
at ifM$extension @ org.typelevel.log4cats.slf4j.internal.Slf4jLoggerInternal$.org$typelevel$log4cats$slf4j$internal$Slf4jLoggerInternal$$contextLog(Slf4jLoggerInternal.scala:55)
at flatMap @ kinesis.mock.KinesisMockService$.$anonfun$run$6(KinesisMockService.scala:34)
at ifM$extension @ org.typelevel.log4cats.slf4j.internal.Slf4jLoggerInternal$.org$typelevel$log4cats$slf4j$internal$Slf4jLoggerInternal$$contextLog(Slf4jLoggerInternal.scala:55)
at flatMap @ kinesis.mock.KinesisMockService$.$anonfun$run$4(KinesisMockService.scala:29)
at run @ com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:519)
at run @ com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:519)
at delay$extension @ pureconfig.module.catseffect.package$.loadF(package.scala:51)
at subflatMap @ pureconfig.module.catseffect.package$.loadF(package.scala:52)
at leftMap @ pureconfig.module.catseffect.package$.loadF(package.scala:53)
at rethrowT @ pureconfig.module.catseffect.package$.loadF(package.scala:54)
at flatMap @ kinesis.mock.KinesisMockService$.$anonfun$run$3(KinesisMockService.scala:27)
at flatMap @ kinesis.mock.KinesisMockService$.$anonfun$run$1(KinesisMockService.scala:26)
at run @ com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:519)
at use @ kinesis.mock.KinesisMockService$.run(KinesisMockService.scala:24)
this currently breaks localstack with DATA_DIR enabled.
Hi I'm currently trying to fix this issue (localstack/localstack#4522) but seems to me that the problem originates in this project due to the lack of support for MultiRegion.
AWS_DEFAULT_REGION=us-east-2 aws --endpoint-url http://localhost:4566 kinesis create-stream --stream-name kinesisStream --shard-count 1
AWS_DEFAULT_REGION=us-east-2 aws --endpoint-url http://localhost:4566 kinesis describe-stream --stream-name kinesisStream | jq .StreamDescription.StreamAR
arn:aws:kinesis:us-east-2:074255357339:stream/kinesisStream
"arn:aws:kinesis:us-east-1:000000000000:stream/kinesisStream"
I understand that there's a variable for setting the default region but the region could also be taken from the Authrization Header of the createStreamRequest to see the current default region of the client.
Also I tried to fix it myself but I really have a hard time trying to understand functional programming code ๐
Thanks for any help you could provide.
The current design leverages set
from the Ref
cache, which can have race conditions with parallel requests. We should be using update
instead. This will require quite a major refactor.
Might involve other regions as well
Example log extract:
2022-02-24T13:07:59.358:INFO:localstack.services.kinesis.kinesis_mock_server: pureconfig.error.ConfigReaderException: Cannot convert configuration to a kinesis.mock.cache.CacheConfig. Failures are:
2022-02-24T13:07:59.358:INFO:localstack.services.kinesis.kinesis_mock_server: at 'aws-region':
2022-02-24T13:07:59.358:INFO:localstack.services.kinesis.kinesis_mock_server: - (env variables) Cannot convert 'eu-west-3' to kinesis.mock.models.AwsRegion: .
Seems the Paris region (eu-west-3
) is missing here: https://github.com/etspaceman/kinesis-mock/blob/main/src/main/scala/kinesis/mock/models/AwsRegion.scala
Right now the docker image uses an assembled jar for its execution. Ideally we'd like to use graal to have an executable that can be easily run outside of a docker environment and without a JDK, also with a smaller size.
Hi, we're sometimes seeing this log output when using kinesis-mock in LocalStack (under Mac OS, Docker version 3.5.2). It doesn't seem to have a noticeable effect from a functional point of view (the service is up and running and available), but would be nice if we can look into fixing this (or suppressing the error message). Please let me know if you need any additional information @etspaceman. Thanks!
14:44:41,037 |-ERROR in ch.qos.logback.core.joran.action.AppenderAction - Could not create an Appender of type [ch.qos.logback.classic.AsyncAppender]. ch.qos.logback.core.util.DynamicClassLoadingException: Failed to instantiate type ch.qos.logback.classic.AsyncAppender
at ch.qos.logback.core.util.DynamicClassLoadingException: Failed to instantiate type ch.qos.logback.classic.AsyncAppender
at at ch.qos.logback.core.util.OptionHelper.instantiateByClassNameAndParameter(OptionHelper.java:69)
at at ch.qos.logback.core.util.OptionHelper.instantiateByClassName(OptionHelper.java:45)
at at ch.qos.logback.core.util.OptionHelper.instantiateByClassName(OptionHelper.java:34)
at at ch.qos.logback.core.joran.action.AppenderAction.begin(AppenderAction.java:52)
at at ch.qos.logback.core.joran.spi.Interpreter.callBeginAction(Interpreter.java:269)
at at ch.qos.logback.core.joran.spi.Interpreter.startElement(Interpreter.java:145)
at at ch.qos.logback.core.joran.spi.Interpreter.startElement(Interpreter.java:128)
at at ch.qos.logback.core.joran.spi.EventPlayer.play(EventPlayer.java:50)
at at ch.qos.logback.core.joran.GenericConfigurator.doConfigure(GenericConfigurator.java:165)
at at ch.qos.logback.core.joran.GenericConfigurator.doConfigure(GenericConfigurator.java:152)
at at ch.qos.logback.core.joran.GenericConfigurator.doConfigure(GenericConfigurator.java:110)
at at ch.qos.logback.core.joran.GenericConfigurator.doConfigure(GenericConfigurator.java:53)
at at ch.qos.logback.classic.util.ContextInitializer.configureByResource(ContextInitializer.java:75)
at at ch.qos.logback.classic.util.ContextInitializer.autoConfig(ContextInitializer.java:150)
at at org.slf4j.impl.StaticLoggerBinder.init(StaticLoggerBinder.java:84)
at at org.slf4j.impl.StaticLoggerBinder.<clinit>(StaticLoggerBinder.java:55)
at at com.oracle.svm.core.classinitialization.ClassInitializationInfo.invokeClassInitializer(ClassInitializationInfo.java:375)
at at com.oracle.svm.core.classinitialization.ClassInitializationInfo.initialize(ClassInitializationInfo.java:295)
at at org.slf4j.LoggerFactory.bind(LoggerFactory.java:150)
at at org.slf4j.LoggerFactory.performInitialization(LoggerFactory.java:124)
at at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:417)
at at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:362)
at at kinesis.mock.KinesisMockService$.$anonfun$run$2(KinesisMockService.scala:26)
at at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:104)
at at cats.effect.internals.IORunLoop$.restartCancelable(IORunLoop.scala:51)
at at cats.effect.internals.IOBracket$BracketStart.run(IOBracket.scala:100)
at at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:67)
at at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:35)
at at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:90)
at at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:90)
at at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:94)
at at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:90)
at at cats.effect.internals.Trampoline.execute(Trampoline.scala:43)
at at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:42)
at at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:80)
at at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:58)
at at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:183)
at at cats.effect.internals.IORunLoop$.restart(IORunLoop.scala:41)
at at cats.effect.internals.IOBracket$.$anonfun$apply$1(IOBracket.scala:48)
at at cats.effect.internals.IOBracket$.$anonfun$apply$1$adapted(IOBracket.scala:34)
at at cats.effect.internals.IOAsync$.$anonfun$apply$1(IOAsync.scala:37)
at at cats.effect.internals.IOAsync$.$anonfun$apply$1$adapted(IOAsync.scala:37)
at at cats.effect.internals.IORunLoop$RestartCallback.start(IORunLoop.scala:447)
at at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:156)
at at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:463)
at at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:484)
at at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:422)
at at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
at at cats.effect.internals.PoolUtils$$anon$2$$anon$3.run(PoolUtils.scala:52)
at at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at at java.lang.Thread.run(Thread.java:834)
at at com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:519)
at at com.oracle.svm.core.posix.thread.PosixJavaThreads.pthreadStartRoutine(PosixJavaThreads.java:192)
Caused by: java.lang.ClassNotFoundException: ch.qos.logback.classic.AsyncAppender
at at com.oracle.svm.core.hub.ClassForNameSupport.forName(ClassForNameSupport.java:60)
at at java.lang.ClassLoader.loadClass(ClassLoader.java:281)
at at ch.qos.logback.core.util.OptionHelper.instantiateByClassNameAndParameter(OptionHelper.java:56)
at ... 54 common frames omitted
14:44:41,037 |-ERROR in ch.qos.logback.core.joran.spi.Interpreter@8:71 - ActionException in Action for tag [appender] ch.qos.logback.core.joran.spi.ActionException: ch.qos.logback.core.util.DynamicClassLoadingException: Failed to instantiate type ch.qos.logback.classic.AsyncAppender
at ch.qos.logback.core.joran.spi.ActionException: ch.qos.logback.core.util.DynamicClassLoadingException: Failed to instantiate type ch.qos.logback.classic.AsyncAppender
at at ch.qos.logback.core.joran.action.AppenderAction.begin(AppenderAction.java:76)
at at ch.qos.logback.core.joran.spi.Interpreter.callBeginAction(Interpreter.java:269)
at at ch.qos.logback.core.joran.spi.Interpreter.startElement(Interpreter.java:145)
at at ch.qos.logback.core.joran.spi.Interpreter.startElement(Interpreter.java:128)
at at ch.qos.logback.core.joran.spi.EventPlayer.play(EventPlayer.java:50)
at at ch.qos.logback.core.joran.GenericConfigurator.doConfigure(GenericConfigurator.java:165)
at at ch.qos.logback.core.joran.GenericConfigurator.doConfigure(GenericConfigurator.java:152)
at at ch.qos.logback.core.joran.GenericConfigurator.doConfigure(GenericConfigurator.java:110)
at at ch.qos.logback.core.joran.GenericConfigurator.doConfigure(GenericConfigurator.java:53)
at at ch.qos.logback.classic.util.ContextInitializer.configureByResource(ContextInitializer.java:75)
at at ch.qos.logback.classic.util.ContextInitializer.autoConfig(ContextInitializer.java:150)
at at org.slf4j.impl.StaticLoggerBinder.init(StaticLoggerBinder.java:84)
at at org.slf4j.impl.StaticLoggerBinder.<clinit>(StaticLoggerBinder.java:55)
at at com.oracle.svm.core.classinitialization.ClassInitializationInfo.invokeClassInitializer(ClassInitializationInfo.java:375)
at at com.oracle.svm.core.classinitialization.ClassInitializationInfo.initialize(ClassInitializationInfo.java:295)
at at org.slf4j.LoggerFactory.bind(LoggerFactory.java:150)
at at org.slf4j.LoggerFactory.performInitialization(LoggerFactory.java:124)
at at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:417)
at at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:362)
at at kinesis.mock.KinesisMockService$.$anonfun$run$2(KinesisMockService.scala:26)
at at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:104)
at at cats.effect.internals.IORunLoop$.restartCancelable(IORunLoop.scala:51)
at at cats.effect.internals.IOBracket$BracketStart.run(IOBracket.scala:100)
at at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:67)
at at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:35)
at at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:90)
at at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:90)
at at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:94)
at at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:90)
at at cats.effect.internals.Trampoline.execute(Trampoline.scala:43)
at at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:42)
at at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:80)
at at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:58)
at at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:183)
at at cats.effect.internals.IORunLoop$.restart(IORunLoop.scala:41)
at at cats.effect.internals.IOBracket$.$anonfun$apply$1(IOBracket.scala:48)
at at cats.effect.internals.IOBracket$.$anonfun$apply$1$adapted(IOBracket.scala:34)
at at cats.effect.internals.IOAsync$.$anonfun$apply$1(IOAsync.scala:37)
at at cats.effect.internals.IOAsync$.$anonfun$apply$1$adapted(IOAsync.scala:37)
at at cats.effect.internals.IORunLoop$RestartCallback.start(IORunLoop.scala:447)
at at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:156)
at at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:463)
at at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:484)
at at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:422)
at at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
at at cats.effect.internals.PoolUtils$$anon$2$$anon$3.run(PoolUtils.scala:52)
at at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at at java.lang.Thread.run(Thread.java:834)
at at com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:519)
at at com.oracle.svm.core.posix.thread.PosixJavaThreads.pthreadStartRoutine(PosixJavaThreads.java:192)
Caused by: ch.qos.logback.core.util.DynamicClassLoadingException: Failed to instantiate type ch.qos.logback.classic.AsyncAppender
at at ch.qos.logback.core.util.OptionHelper.instantiateByClassNameAndParameter(OptionHelper.java:69)
at at ch.qos.logback.core.util.OptionHelper.instantiateByClassName(OptionHelper.java:45)
at at ch.qos.logback.core.util.OptionHelper.instantiateByClassName(OptionHelper.java:34)
at at ch.qos.logback.core.joran.action.AppenderAction.begin(AppenderAction.java:52)
at ... 51 common frames omitted
Caused by: java.lang.ClassNotFoundException: ch.qos.logback.classic.AsyncAppender
at at com.oracle.svm.core.hub.ClassForNameSupport.forName(ClassForNameSupport.java:60)
at at java.lang.ClassLoader.loadClass(ClassLoader.java:281)
at at ch.qos.logback.core.util.OptionHelper.instantiateByClassNameAndParameter(OptionHelper.java:56)
at ... 54 common frames omitted
14:44:41,037 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of ROOT logger to ERROR
We will need a docker image to accomplish #5 . Ideally we use Graal to run the service. The docker image should be publicly accessible via Dockerhub.
Add unit tests for every cache interaction as well as all circe codecs.
Add functional tests for every API call. Functional tests should be accomplished using the Java AWS SDK, KCL and KPL.
GetRecords currently returns the same shard iterator that was passed. It should instead return a new shard iterator in the nextShardIterator
part of the response. This causes issues with the Dynamodb Streams connector, which depends on this behavior.
This one is a challenge. It requires Http2 server-push functionality and the documentation on this is very limiting. I believe Http4s has support for server-push but perhaps not in the way that SubscribeToShard needs to be supported.
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html
https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/
Without this support, users will have to use the PollingConsumer in their tests.
Logging should be implemented with each API call. We should expose a LOG_LEVEL
environment variable that allows users to configure it. Log levels should follow the following:
Hello, there!
As part of the university research we are currently doing regarding the security of Github Actions, we noticed that one or many of the workflows that are part of this repository are referencing vulnerable versions of the third-party actions. As part of a disclosure process, we decided to open issues to notify GitHub Community.
Please note that there are could be some false positives in our methodology, thus not all of the open issues could be valid. If that is the case, please let us know, so that we can improve on our approach. You can contact me directly using an email: ikoishy [at] ncsu.edu
Thanks in advance
The vulnerability fix that is missing by actions' versions could be related to:
(1) CVE fix
(2) upgrade of vulnerable dependency
(3) fix to secret leak and others.
Please consider updating the reference to the action.
If you end up updating the reference, please let us know. We need the stats for the paper :-)
Describe the bug
Hi, a LocalStack user encountered an issue while using an Elixir library. You can see the whole bug report here localstack/localstack#7358
It seems AWS is not always consistent with the format of the Authorization
header, from the report:
I'm getting the following error
Authorization header requires \Signature\ parameter. Authorization header requires \SignedHeaders\ parameter.
The Authorization header does have those parameters, but... they're written like thisAWS4-HMAC-SHA256 Credential=,SignedHeaders=<sign_head>,Signature=
instead ofAWS4-HMAC-SHA256 Credential=, SignedHeaders=<sign_head>, Signature=
^ notice the extra space
which seemingly LocalStack expects, but shouldn't (?)AWS' Authenticating Requests (AWS Signature Version 4) > Using an Authorization Header > Overview is not very clear either since in the example they post, we find
Authorization: AWS4-HMAC-SHA256
Credential=AKIAIOSFODNN7EXAMPLE/20130524/us-east-1/s3/aws4_request,
^ a space here
SignedHeaders=host;range;x-amz-date,
^ no space here
Signature=fe5f80f77d5fa3beca038a248ff027d0445342fe2855ddc963176630326f1024
The issue seems to stem for the parsing of the header here:
kinesis-mock/src/main/scala/kinesis/mock/KinesisMockRoutes.scala
Lines 180 to 185 in d5847bb
To Reproduce
From the user:
(Sorry if it does not work out of the box against kinesis-mock
)
curl -v -X POST -H "Authorization: AWS4-HMAC-SHA256 Credential=test/20221219/us-east-1/kinesis/aws4_request,SignedHeaders=content-type;host;x-amz-date;x-amz-target,Signature=ba9705e002b4f224d08c908606f0076a7f4f53bcc68315897764c587c7e5ffb5" -H "content-type: application/x-amz-json-1.1" -H "host: localhost:4566" -H "x-amz-date: 20221219T221436Z" -H "x-amz-target: Kinesis_20131202.ListStreams" http://127.0.0.1:4566
LocalStack logs:
2022-12-19T22:23:08.585 INFO --- [ asgi_gw_0] localstack.services.infra : Starting mock Kinesis service on http port 4566 ...
2022-12-19T22:23:09.394 INFO --- [-functhread9] l.s.k.kinesis_mock_server : [io-compute-blocker-2] INFO 2022-12-19 22:23:09,392 k.m.KinesisMockService contextId=b7aab196-7feb-11ed-b751-af810d2f0af3, cacheConfig={"awsAccountId":"000000000000","awsRegion":"us-east-1","createStreamDuration":{"length":500,"unit":"MILLISECONDS"},"deleteStreamDuration":{"length":500,"unit":"MILLISECONDS"},"deregisterStreamConsumerDuration":{"length":500,"unit":"MILLISECONDS"},"initializeStreams":null,"mergeShardsDuration":{"length":500,"unit":"MILLISECONDS"},"onDemandStreamCountLimit":10,"persistConfig":{"fileName":"000000000000.json","interval":{"length":5,"unit":"SECONDS"},"loadIfExists":true,"path":"/var/lib/localstack/tmp/state/kinesis","shouldPersist":true},"registerStreamConsumerDuration":{"length":500,"unit":"MILLISECONDS"},"shardLimit":100,"splitShardDuration":{"length":500,"unit":"MILLISECONDS"},"startStreamEncryptionDuration":{"length":500,"unit":"MILLISECONDS"},"stopStreamEncryptionDuration":{"length":500,"unit":"MILLISECONDS"},"updateShardCountDuration":{"length":500,"unit":"MILLISECONDS"}} - Logging Cache Config
2022-12-19T22:23:09.952 INFO --- [-functhread9] l.s.k.kinesis_mock_server : [io-compute-blocker-6] INFO 2022-12-19 22:23:09,952 k.m.KinesisMockService - Starting Kinesis TLS Mock Service on port 41233
2022-12-19T22:23:09.952 INFO --- [-functhread9] l.s.k.kinesis_mock_server : [io-compute-blocker-6] INFO 2022-12-19 22:23:09,952 k.m.KinesisMockService - Starting Kinesis Plain Mock Service on port 57369
2022-12-19T22:23:09.963 INFO --- [-functhread9] l.s.k.kinesis_mock_server : [io-compute-2] INFO 2022-12-19 22:23:09,962 k.m.KinesisMockService contextId=b801f957-7feb-11ed-b751-af810d2f0af3 - Starting persist data loop
2022-12-19T22:23:10.389 INFO --- [-functhread9] l.s.k.kinesis_mock_server : [io-compute-0] WARN 2022-12-19 22:23:10,389 k.m.KinesisMockRoutes missingAuthKeys=Signature, SignedHeaders, x-amzn-RequestId=b844327b-7feb-11ed-b751-af810d2f0af3, action=ListStreams, contextId=b844327a-7feb-11ed-b751-af810d2f0af3, x-amz-id-2=bW85lsdMwUNuQMLM/z6xbM6FqktvJQliiy3X9GZpBQOXYKlMOSJe3GnAAeeag7fAHrtCO8JY9jdspHFvxhOEr1A97tuC7wnd, contentType=application/x-amz-json-1.1 - Some required information was not provied with the authorization header
2022-12-19T22:23:10.396 INFO --- [ asgi_gw_0] localstack.request.aws : AWS kinesis.ListStreams => 400 (IncompleteSignatureException)
2022-12-19T22:24:01.014 INFO --- [ asgi_gw_1] localstack.request.aws : AWS kinesis.ListStreams => 200
Expected behavior
Being able to parse both Authorization string format
And again, thanks a lot for your work!
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.