Giter Site home page Giter Site logo

kafka-connect-dynamodb's Introduction

kafka-connect-dynamodb is a Kafka Connector for loading data to and from Amazon DynamoDB.

It is implemented using the AWS Java SDK for DynamoDB. For authentication, the DefaultAWSCredentialsProviderChain is used.

Building

Run:

$ mvn clean package

Then you will find this connector and required JARs it depends upon in target/kafka-connect-dynamodb-$version-SNAPSHOT-package/share/java/kafka-connect-dynamodb/*.

To create an uber JAR:

$ mvn -P standalone clean package

The uber JAR will be created at target/kafka-connect-dynamodb-$version-SNAPSHOT-standalone.jar.

Sink Connector

Example configuration

Ingest the orders topic to a DynamoDB table of the same name in the specified region:

name=dynamodb-sink-test
topics=orders
connector.class=dynamok.sink.DynamoDbSinkConnector
region=us-west-2
ignore.record.key=true

Record conversion

Refer to DynamoDB Data Types.

At the top-level, we need to either be converting to the DynamoDB Map data type, or the top.key.attribute or top.value.attribute configuration options for the Kafka record key or value as applicable should be configured, so we can ensure being able to hoist the converted value as a DynamoDB record.

Schema present

Connect Schema Type DynamoDB
INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, Decimal Number
BOOL Boolean
BYTES Binary
STRING String
ARRAY List
MAP [1], STRUCT Map
[1]Map keys must be primitive types, and cannot be optional.

null values for optional schemas are translated to the Null type.

Schemaless

Java DynamoDB
null Null
Number [2] Number
Boolean Boolean
byte[], ByteBuffer Binary
String String
List List
Empty Set [3] Null
Set<String> String Set
Set<Number> Number Set
Set<byte[]>, Set<ByteBuffer> Binary Set
Map [4] Map

Any other datatype will result in the connector to fail.

[2]i.e. Byte, Short, Integer, Long, Float, Double, BigInteger, BigDecimal
[3]It is not possible to determine the element type of an empty set.
[4]Map keys must be primitive types, and cannot be optional.

Configuration options

region

AWS region for DynamoDB.

  • Type: string
  • Default: ""
  • Importance: high
access.key.id

Explicit AWS access key ID. Leave empty to utilize the default credential provider chain.

  • Type: password
  • Default: [hidden]
  • Importance: low
secret.key

Explicit AWS secret access key. Leave empty to utilize the default credential provider chain.

  • Type: password
  • Default: [hidden]
  • Importance: low
batch.size

Batch size between 1 (dedicated PutItemRequest for each record) and 25 (which is the maximum number of items in a BatchWriteItemRequest)

  • Type: int
  • Default: 1
  • Importance: high
kafka.attributes

Trio of topic,partition,offset attribute names to include in records, set to empty to omit these attributes.

  • Type: list
  • Default: [kafka_topic, kafka_partition, kafka_offset]
  • Importance: high
table.format

Format string for destination DynamoDB table name, use ${topic} as placeholder for source topic.

  • Type: string
  • Default: "${topic}"
  • Importance: high
ignore.record.key

Whether to ignore Kafka record keys in preparing the DynamoDB record.

  • Type: boolean
  • Default: false
  • Importance: medium
ignore.record.value

Whether to ignore Kafka record value in preparing the DynamoDB record.

  • Type: boolean
  • Default: false
  • Importance: medium
top.key.attribute

DynamoDB attribute name to use for the record key. Leave empty if no top-level envelope attribute is desired.

  • Type: string
  • Default: ""
  • Importance: medium
top.value.attribute

DynamoDB attribute name to use for the record value. Leave empty if no top-level envelope attribute is desired.

  • Type: string
  • Default: ""
  • Importance: medium
max.retries

The maximum number of times to retry on errors before failing the task.

  • Type: int
  • Default: 10
  • Importance: medium
retry.backoff.ms

The time in milliseconds to wait following an error before a retry attempt is made.

  • Type: int
  • Default: 3000
  • Importance: medium

Source Connector

Example configuration

Ingest all DynamoDB tables in the specified region, to Kafka topics with the same name as the source table:

name=dynamodb-source-test
connector.class=dynamok.source.DynamoDbSourceConnector
region=us-west-2

Record conversion

TODO describe conversion scheme

Limitations

DynamoDB records containing heterogeneous lists (L) or maps (M) are not currently supported, these fields will be silently dropped. It will be possible to add support for them with the implementation of KAFKA-3910.

Configuration options

region

AWS region for DynamoDB.

  • Type: string
  • Default: ""
  • Importance: high
access.key.id

Explicit AWS access key ID. Leave empty to utilize the default credential provider chain.

  • Type: password
  • Default: [hidden]
  • Importance: low
secret.key

Explicit AWS secret access key. Leave empty to utilize the default credential provider chain.

  • Type: password
  • Default: [hidden]
  • Importance: low
topic.format

Format string for destination Kafka topic, use ${table} as placeholder for source table name.

  • Type: string
  • Default: "${table}"
  • Importance: high
tables.prefix

Prefix for DynamoDB tables to source from.

  • Type: string
  • Default: ""
  • Importance: medium
tables.whitelist

Whitelist for DynamoDB tables to source from.

  • Type: list
  • Default: ""
  • Importance: medium
tables.blacklist

Blacklist for DynamoDB tables to source from.

  • Type: list
  • Default: ""
  • Importance: medium

kafka-connect-dynamodb's People

Contributors

dbtucker avatar shikhar avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-connect-dynamodb's Issues

null values should not put attribute instead of putting Null type

I've have an avro field that is a union of "null" and "int". My expectation is that when this field is null, the field would not be added as an attribute to the item.

This would allow me to filter records using exists or not_exists.

The current situation is I have a mixture of types for this attribute. For example, one item will have Number: 35 and another will have Null: true.

When I try to use the filter exists for this attribute, the result set is a mixture of true and numbers.

Java null pointer exception when deleting items from table

I'm using source connector for my dynamo table.
There are no problems until I delete an item from the table, and this is what I got when I delete the item:
2017-05-01 14:12:28,609] ERROR Task dynamodb-source-test-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141) java.lang.NullPointerException at dynamok.source.RecordMapper.toConnect(RecordMapper.java:57) at dynamok.source.DynamoDbSourceTask.toSourceRecord(DynamoDbSourceTask.java:120) at dynamok.source.DynamoDbSourceTask.lambda$poll$9(DynamoDbSourceTask.java:110) at dynamok.source.DynamoDbSourceTask$$Lambda$18/1910327716.apply(Unknown Source) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:502) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at dynamok.source.DynamoDbSourceTask.poll(DynamoDbSourceTask.java:111) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:162) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

I'm wondering if the connector is trying to get attributes from NewImage but NewImage is null when deleting an item from the pointer so it keeps throwing NullPointer exception.

running the connector with local dynamodb

Hi,

It would be great if its possible to run kafka connector on a local environment too.

AmazonDynamoDBClient client = new AmazonDynamoDBClient();

// Set the endpoint URL
client.withEndpoint("http://localhost:8000");

That should do it. Also this could either be the default behaviour or provided through config.

kafka-connect-dynamodb causing connect cluster to become weirdly unresponsive

Hi,

I registered a dynamodb connect on my cluster and now the cluster has become strangely unresponsive to any REST API endpoints (for example curl -X GET -i http://my-avro-kafka-connect.domain.com/connectors returns a 504 Gateway Time-out error).

curl -X GET -i http://my-avro-kafka-connect.domain.com however, work, ie, it does return a 200.

I see nothing in the logs, no exceptions.

Additionally, the connect doesn't work (ie, doesn't seem to be writing any data to dynamodb)

I am using the sink connector with configs like this:

name=my-dynamodb-sink
topics=my-topic
connector.class=dynamok.sink.DynamoDbSinkConnector
region=us-east-1
ignore.record.key=true
table.format=my_table_name

Also, FWIW, I built off the head of master and deployed by JAR-ing the contents in target/kafka-connect-dynamodb-0.3.0-SNAPSHOT-package/share/java/kafka-connect-dynamodb and then adding that to the CLASSPATH env variable.

Any ideas on how to debug this further? Thank you.

Source doesn't refresh shards

According to the API documentation:

Shards are ephemeral: They are created and deleted automatically, as needed.
Any shard can also split into multiple new shards; this also occurs automatically.
(Note that it is also possible for a parent shard to have just one child shard.) A
shard might split in response to high levels of write activity on its parent table,
so that applications can process records from multiple shards in parallel.

The source connector only appears to find Shards on startup, and if a shard it split or closed off, it doesn't start reading from the children.

My logs are filled with Shard ID {}for table{} has been closed, it will no longer be polled messages, and traffic disappears over time.

is it expected that my connect-offsets is 0?

Hi -- so I have two separate instances of this connect running and everything seems to be working well! But also I noticed that my connect-offsets topic has no data in it. Is that to be expected? Are the offsets being stored somewhere else?

Divide by zero exception - assignedShards is empty

Hi Shikhar,

I am trying to run the dynamodb connector. It runs fine initially and after sometime it is throwing exception

java.lang.ArithmeticException: / by zero
at dynamok.source.DynamoDbSourceTask.poll(DynamoDbSourceTask.java:105)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:179)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

which happens at
currentShardIdx = (currentShardIdx + 1) % assignedShards.size();

I am not sure why assignedShards is empty as I can see data in output for some time. I tried it using multiple tables and multiple times but the same error.

option to handle or filter out blank strings for the sink

I know this is an issue due to DynamoDb but it would be really convenient if the connect had an option to simply remove attributes that are blank strings. Currently we are handling this issue (kind of poorly) upstream in the kafka stream stage.

I feel like this is kind of an odd quirk of DynamoDb that I don't really see in many other data stores and would be nice if the connect shielded this annoying detail from the rest of the systems.

AmazonDynamoDBException: 1 validation error detected

The following configuration:

{
  "connector.class": "dynamok.sink.DynamoDbSinkConnector",
  "name": "kafka-2-dynamodb",
  "ignore.record.value": "true",
  "ignore.record.key": "true",
  "kafka.attributes": "kafka_topic,kafka_partition,kafka_offset",
  "region": "us-west-2",
  "topics": "postgres-company"
}

results in:

com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: 1 validation error detected: Value '{postgres-company=[com.amazonaws.dynamodb.v20120810.WriteRequest@ab195a1e, com.amazonaws.dynamodb.v20120810.WriteRequest@ab195a1f, com.amazonaws.dynamodb.v20120810.WriteRequest@ab195a20, com.amazonaws.dynamodb.v20120810.WriteRequest@ab195a19, com.amazonaws.dynamodb.v20120810.WriteRequest@ab195a1a, com.amazonaws.dynamodb.v20120810.WriteRequest@ab195a1b, com.amazonaws.dynamodb.v20120810.WriteRequest@ab195a1c, com.amazonaws.dynamodb.v20120810.WriteRequest@ab195a15, com.amazonaws.dynamodb.v20120810.WriteRequest@ab195a16, com.amazonaws.dynamodb.v20120810.WriteRequest@ab195a17, com.amazonaws.dynamodb.v20120810.WriteRequest@ab195c0d, com.amazonaws.dynamodb.v20120810.WriteRequest@ab195c0e, com.amazonaws.dynamodb.v20120810.WriteRequest@ab195c0f, com.amazonaws.dynamodb.v20120810.WriteRequest@ab195c10, com.amazonaws.dynamodb.v20120810.WriteRequest@ab195c09, com.amazonaws.dynamodb.v20120810.WriteRequest@ab195c0a, com.amazonaws.dynamodb.v20120810.WriteRequest@ab195c0b, com.amazonaws.dynamodb.v20120810.WriteRequest@ab195c0c, com.amazonaws.dynamodb.v20120810.WriteRequest@ab195c05, com.amazonaws.dynamodb.v20120810.WriteRequest@ab195c06, com.amazonaws.dynamodb.v20120810.WriteRequest@ab195c14, com.amazonaws.dynamodb.v20120810.WriteRequest@ab195bad, com.amazonaws.dynamodb.v20120810.WriteRequest@ab195bae, com.amazonaws.dynamodb.v20120810.WriteRequest@ab195baf, com.amazonaws.dynamodb.v20120810.WriteRequest@ab195bb0, com.amazonaws.dynamodb.v20120810.WriteRequest@ab195ba9]}' at 'requestItems' failed to satisfy constraint: Map value must satisfy constraint: [Member must have length less than or equal to 25, Member must have length greater than or equal to 1] (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ValidationException; Request ID: 1755G6B303RMBUD7FOO0647HFNVV4KQNSO5AEMVJF66Q9ASUAAJG)
    at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1386)
    at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:939)
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:714)
    at com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:465)
    at com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:427)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:376)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.doInvoke(AmazonDynamoDBClient.java:2078)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:2048)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.batchWriteItem(AmazonDynamoDBClient.java:811)
    at dynamok.sink.DynamoDbSinkTask.put(DynamoDbSinkTask.java:103)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:381)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

java.lang.ClassCastException: java.lang.String cannot be cast to java.util.List

When using the following config:

{
  "connector.class": "dynamok.source.DynamoDbSourceConnector",
  "name": "dynamodb-2-kafka",
  "tables.regex": "test",
  "tables.whitelist": "test",
  "region": "us-west-2"
}

it results with:

[2016-10-12 14:22:22,699] ERROR Error while starting connector dynamodb-2-kafka (org.apache.kafka.connect.runtime.WorkerConnector)
java.lang.ClassCastException: java.lang.String cannot be cast to java.util.List
    at org.apache.kafka.common.config.AbstractConfig.getList(AbstractConfig.java:100)
    at dynamok.source.ConnectorConfig.<init>(ConnectorConfig.java:64)
    at dynamok.source.DynamoDbSourceConnector.start(DynamoDbSourceConnector.java:74)
    at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:100)
    at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:125)
    at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:182)
    at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:178)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:789)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.processConnectorConfigUpdates(DistributedHerder.java:307)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:279)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:176)
    at java.lang.Thread.run(Thread.java:745)

java.lang.NullPointerException

The following configuration:

{
  "connector.class": "dynamok.source.DynamoDbSourceConnector",
  "name": "dynamodb-2-kafka",
  "tables.regex": "dev",
  "region": "us-west-2"
}

results in:

[2016-10-12 13:14:59,052] ERROR Error while starting connector dynamodb-2-kafka (org.apache.kafka.connect.runtime.WorkerConnector)
java.lang.NullPointerException
    at dynamok.source.DynamoDbSourceConnector.start(DynamoDbSourceConnector.java:100)
    at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:100)
    at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:125)
    at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:182)
    at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:178)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:789)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startWork(DistributedHerder.java:755)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.handleRebalanceCompleted(DistributedHerder.java:715)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:206)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:176)
    at java.lang.Thread.run(Thread.java:745)

How to handle invalid messages

Hi Shikhar,

not an issue per se, but what's a proper way of handling exceptions in the connector? Specifically, if a an invalid message gets inserted into a message queue - say a JSON with single quotes or JSON without a proper Dynamo key, - connect-dynamodb crashes and does not recover unless I delete and re-create the topic and restart the connector. Is there a better way?

thanks,
Maskym

Dynamo Db source issue

Connector keeps on getting this issue:
org.apache.kafka.connect.errors.ConnectException: No remaining source shards
at dynamok.source.DynamoDbSourceTask.poll(DynamoDbSourceTask.java:84)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:186)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.