Giter Site home page Giter Site logo

kafka-connect-rest's Introduction

Kafka Connect REST connector

Building and running Spring example in docker

Build the project and copy the jars to the example

mvn clean install && \
cd examples/spring/gs-rest-service && \
mvn clean install && \
cd .. && \
cp ../../kafka-connect-rest-plugin/target/kafka-connect-rest-plugin-*-shaded.jar jars/ && \
cp ../../kafka-connect-transform-from-json/kafka-connect-transform-from-json-plugin/target/kafka-connect-transform-from-json-plugin-*-shaded.jar jars/ && \
cp ../../kafka-connect-transform-add-headers/target/kafka-connect-transform-add-headers-*-shaded.jar jars/ && \
cp ../../kafka-connect-transform-velocity-eval/target/kafka-connect-transform-velocity-eval-*-shaded.jar jars/

Bring up the docker containers

docker-compose up -d

Create the destination topic

docker exec -it spring_connect_1 bash -c \
 "kafka-topics --zookeeper zookeeper \
   --topic restSourceDestinationTopic --create \
   --replication-factor 1 --partitions 1"

Configure the sink and source connectors

curl -X POST \
   -H 'Host: connect.example.com' \
   -H 'Accept: application/json' \
   -H 'Content-Type: application/json' \
  http://localhost:8083/connectors -d @config/sink.json

curl -X POST \
   -H 'Host: connect.example.com' \
   -H 'Accept: application/json' \
   -H 'Content-Type: application/json' \
  http://localhost:8083/connectors -d @config/source.json

View the contents of the destination topic

docker exec -it spring_connect_1 bash -c \
 "kafka-avro-console-consumer --bootstrap-server kafka:9092 \
  --topic restSourceDestinationTopic --from-beginning \
  --property schema.registry.url=http://schema_registry:8081/"

View the webserver logs

docker logs -f spring_webservice_1

Shutdown the docker containers

docker-compose down
cd ../..

If you don't want to use Avro

Change CONNECT_VALUE_CONVERTER in the docker-compose.yml to org.apache.kafka.connect.storage.StringConverter if you don't want to use Avro.

docker exec -it spring_connect_1 bash -c \
 "kafka-console-consumer --bootstrap-server kafka:9092 \
  --topic restSourceDestinationTopic --from-beginning"

Building and running Google Cloud Function example in docker (currently untested)

You will need gcloud installed and a GCP project with payments enabled.

mvn clean install
cd examples/gcf

Replace '<REGION>' and '<PROJECTID>' in rest.source.url in config/source.json.

"rest.source.url": "https://<REGION>-<PROJECTID>.cloudfunctions.net/hello",

gcloud beta functions deploy hello --trigger-http

curl -X POST http://https://<REGION>-<PROJECTID>.cloudfunctions.net/hello -d 'name=Kafka Connect'

docker-compose up -d

docker exec -it gcf_connect_1 bash -c \
 "kafka-topics --zookeeper zookeeper \
   --topic restSourceDestinationTopic --create \
   --replication-factor 1 --partitions 1"

curl -X POST \
   -H 'Host: connect.example.com' \
   -H 'Accept: application/json' \
   -H 'Content-Type: application/json' \
  http://localhost:8083/connectors -d @config/source.json

docker exec -it spring_connect_1 bash -c \
 "kafka-avro-console-consumer --bootstrap-server kafka:9092 \
  --topic restSourceDestinationTopic --from-beginning \
  --property schema.registry.url=http://schema_registry:8081/"

docker-compose down

kafka-connect-rest's People

Contributors

antonioiorfino avatar dependabot[bot] avatar kathleentully avatar kleino avatar lenimartin avatar llofberg avatar sknop avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-connect-rest's Issues

Json sink

The current behavior is the use the value's toString() POST with. When using this and the JsonConverter, the value is a Map<String, Object>. This is stringified as {key1={key2=value2}} which is not desirable. Instead, I would like to serialize it to json using Kafka's Jackson dependency.

// Convert to String for outgoing REST calls
public String convert(SinkRecord record) {
return record.value().toString();
}

Can be changed to,

 // Convert to String for outgoing REST calls 
 public String convert(SinkRecord record) { 
    try {
      return mapper.writeValueAsString(record.value());      
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
 }

This of course may not be desirable for others. I'd recommend having a pluggable serializer, as the current behavior was surprising when integrating with a source pushing Structs.

support for dynamic request payload

Hi,

I am looking at your code (thanks for making it available - very helpful) with a view to supporting dynamic request payloads. The use case is stateful APIs. For instance requesting the last n seconds of results from an API or page n of the results.

My thoughts are that it requires the response handler to be able to alter the request payload data (used by the next request). There may be a need to indicate whether another request should be made immediately (paging) or whether this polling cycle is done.

Is this something you have designed for or thought of? Is it a feature your would be interested in adding (if I submitted a PR)? Do you have any thoughts on implementation?

Thanks,

Anthony.

Quick Question regarding GET request with header

Hi,

Don't know if it is still supported. Currently testing this connector under a docker-compose image. I am using the configurations below to extract information from forex API. It uses a header for the api key. The container is returning a 401 Error, Wondering if this some design issue from my end.

Configuration:
"Content-Type:application/json" http://localhost:8083/connectors/
-d '{
"name": "Source-REST-Oanda-5M",
"config": {
"connector.class": "com.tm.kafka.connect.rest.RestSourceConnector",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"tasks.max": "1",
"rest.source.method": "GET",
"rest.source.poll.interval.ms": "30000",
"rest.source.destination.topics": "test",
"rest.source.topic.selector": "com.tm.kafka.connect.rest.selector.SimpleTopicSelector",
"rest.source.properties":"Content-Type:application/json,Accept::application/json",
"rest.source.headers": "Content-Type:application/json,Accept:application/json,Authorization: Bearer ",
"rest.source.url": "https://api-fxpractice.oanda.com/v3/instruments/EUR_USD/candles?count=6&price=M&granularity=M5"}}'

Error in AvroData.java while creating Source Connector

Caused by: java.lang.VerifyError: Bad type on operand stack
Exception Details:
Location:
org/apache/avro/Schema.()V @39: invokevirtual
Reason:
Type 'com/fasterxml/jackson/databind/ObjectMapper' (current frame, stack[1]) is not assignable to 'com/fasterxml/jackson/core/ObjectCodec'
Current Frame:
bci: @39
flags: { }
locals: { }
stack: { 'com/fasterxml/jackson/core/JsonFactory', 'com/fasterxml/jackson/databind/ObjectMapper' }
Bytecode:
0x0000000: bb01 1a59 b701 1bb3 004f bb01 1c59 b200
0x0000010: 4fb7 011d b300 edb2 004f b201 1eb6 011f
0x0000020: 57b2 004f b200 edb6 0120 57bb 0121 5910
0x0000030: 0abd 00c8 5903 12a6 5359 0412 b353 5905
0x0000040: 12d4 5359 0612 a753 5907 12a4 5359 0812
0x0000050: d953 5910 0612 ce53 5910 0712 d753 5910
0x0000060: 0812 5c53 5910 0912 e753 b801 22b7 0123
0x0000070: b300 0cbb 0121 59b2 000c b701 23b3 000b
0x0000080: b200 0b12 bfb9 00ec 0200 57bb 0121 59b7
0x0000090: 0124 b300 09b2 0009 1006 bd00 c859 0312
0x00000a0: bf53 5904 12a6 5359 0512 a753 5906 12bb
0x00000b0: 5359 0712 5c53 5908 12e7 53b8 0125 57ba
0x00000c0: 0126 0000 b801 27b3 0005 ba01 2800 00b8
0x00000d0: 0127 b300 04bb 00f7 59b7 0118 b300 abb2
0x00000e0: 00ab 1301 29b2 012a b901 0103 0057 b200
0x00000f0: ab13 012b b201 2cb9 0101 0300 57b2 00ab
0x0000100: 1301 2db2 012e b901 0103 0057 b200 ab13
0x0000110: 012f b201 30b9 0101 0300 57b2 00ab 1301
0x0000120: 31b2 00c0 b901 0103 0057 b200 ab13 0132
0x0000130: b200 c1b9 0101 0300 57b2 00ab 1301 33b2
0x0000140: 0134 b901 0103 0057 b200 ab13 0135 b200
0x0000150: 67b9 0101 0300 57ba 0136 0000 b801 27b3
0x0000160: 0003 ba01 3700 00b8 0127 b300 02b1

at io.confluent.connect.avro.AvroData.<clinit>(AvroData.java:155)
at org.apache.kafka.connect.transforms.FromJson.<init>(FromJson.java:33)
at org.apache.kafka.connect.transforms.FromJson$Value.<init>(FromJson.java:90)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at org.apache.kafka.connect.runtime.ConnectorConfig.getConfigDefFromTransformation(ConnectorConfig.java:321)
at org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:296)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:263)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:538)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:535)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:271)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:220)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more

[2020-01-23 04:04:45,549] INFO 172.18.0.1 - - [23/Jan/2020:04:04:45 +0000] "POST /connectors HTTP/1.1" 500 0 275 (org.apache.kafka.connect.runtime.rest.RestServer)
[2020-01-23 04:04:57,543] INFO 172.18.0.1 - - [23/Jan/2020:04:04:57 +0000] "GET /connectors HTTP/1.1" 200 21 8 (org.apache.kafka.connect.runtime.rest.RestServer)
[2020-01-23 04:04:58,667] INFO 172.18.0.1 - - [23/Jan/2020:04:04:58 +0000] "GET /connectors HTTP/1.1" 200 21 5 (org.apache.kafka.connect.runtime.rest.RestServer)
[2020-01-23 04:04:59,657] INFO 172.18.0.1 - - [23/Jan/2020:04:04:59 +0000] "GET /connectors HTTP/1.1" 200 21 11 (org.apache.kafka.connect.runtime.rest.RestServer)
[2020-01-23 04:05:01,136] INFO 172.18.0.1 - - [23/Jan/2020:04:05:01 +0000] "GET /connectors HTTP/1.1" 200 21 6 (org.apache.kafka.connect.runtime.rest.RestServer)
[2020-01-23 04:05:06,306] INFO 172.18.0.1 - - [23/Jan/2020:04:05:06 +0000] "GET /connectors HTTP/1.1" 200 21 7 (org.apache.kafka.connect.runtime.rest.RestServer)
[2020-01-23 04:06:17,975] INFO Cluster ID: 1huMfB1eTuW8-VRZBsVrMg (org.apache.kafka.clients.Metadata)
[2020-01-23 04:07:14,191] INFO 172.18.0.1 - - [23/Jan/2020:04:07:14 +0000] "HEAD / HTTP/1.1" 200 0 6 (org.apache.kafka.connect.runtime.rest.RestServer)
[2020-01-23 04:07:14,331] INFO 172.18.0.1 - - [23/Jan/2020:04:07:14 +0000] "HEAD / HTTP/1.1" 200 0 4 (org.apache.kafka.connect.runtime.rest.RestServer)
[2020-01-23 04:07:41,339] INFO 172.18.0.1 - - [23/Jan/2020:04:07:41 +0000] "HEAD / HTTP/1.1" 200 0 3 (org.apache.kafka.connect.runtime.rest.RestServer)
[2020-01-23 04:08:16,762] INFO 172.18.0.1 - - [23/Jan/2020:04:08:16 +0000] "GET /connectors HTTP/1.1" 200 21 9 (org.apache.kafka.connect.runtime.rest.RestServer)
[2020-01-23 04:08:17,518] INFO 172.18.0.1 - - [23/Jan/2020:04:08:17 +0000] "GET /connectors HTTP/1.1" 200 21 7 (org.apache.kafka.connect.runtime.rest.RestServer)
[2020-01-23 04:09:17,855] INFO 172.18.0.1 - - [23/Jan/2020:04:09:17 +0000] "POST /connectors HTTP/1.1" 409 73 85 (org.apache.kafka.connect.runtime.rest.RestServer)
[2020-01-23 04:09:32,873] WARN /connectors (org.eclipse.jetty.server.HttpChannel)
javax.servlet.ServletException: javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class io.confluent.connect.avro.AvroData
at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:169)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
at org.eclipse.jetty.server.Server.handle(Server.java:531)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:352)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:260)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:281)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:102)
at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:118)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:762)
at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:680)
at java.lang.Thread.run(Thread.java:748)
Caused by: javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class io.confluent.connect.avro.AvroData
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:432)
at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:370)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:389)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:342)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:229)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:865)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:535)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1595)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1317)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:473)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1564)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1219)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)
at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:126)

`\x0A` in JSON from source

Using this config:

{
  "name": "rest-source-river_levels",
  "config": {
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"org.apache.kafka.connect.storage.StringConverter",
    "connector.class": "com.tm.kafka.connect.rest.RestSourceConnector",
    "tasks.max": "1",
    "rest.source.poll.interval.ms": "110000",
    "rest.source.method": "GET",
    "rest.source.url": "http://environment.data.gov.uk/flood-monitoring/id/stations/3680",
    "rest.source.payload.converter.class": "com.tm.kafka.connect.rest.converter.StringPayloadConverter",
    "rest.source.properties": "Content-Type:application/json,Accept::application/json",
    "rest.source.topic.selector": "com.tm.kafka.connect.rest.selector.SimpleTopicSelector",
    "rest.source.destination.topics": "river_levels"
  }
}

data is successfully retrieved, but is written to the topic with a literal \x0A for each newline in the source message. This breaks the JSON.

Seen from KSQL it looks like this:

ksql> print 'river_levels' from beginning;
Format:STRING
19/07/18 22:57:25 BST , null , { \x0A  "@context" : "http://environment.data.gov.uk/flood-monitoring/meta/context.jsonld" ,\x0A  "meta" : { \x0A    "publisher" : "Environment Agency" ,\x0A    "licence" : "http://www.nationalarchives.gov.uk/doc/open-government-licence/version/3/" ,\x0A    "documentation" : "http://environment.data.gov.uk/flood-monitoring/doc/reference" ,\x0A    "version" : "0.9" ,\x0A    "comment" : "Status: Beta service" ,\x0A    "hasFormat" : [ "http://environment.data.gov.uk/flood-monitoring/id/stations/3680.rdf", "http://environment.data.gov.uk/flood-monitoring/id/stations/3680.ttl", "http://environment.data.gov.uk/flood-monitoring/id/stations/3680.html" ]\x0A  }\x0A   ,\x0A  "items" : { \x0A    "@id" : "http://environment.data.gov.uk/flood-monitoring/id/stations/3680" ,\x0A    "eaRegionName" : "Midland" ,\x0A    "easting" : 467950 ,\x0A    "gridReference" : "SK679153" ,\x0A    "label" : "Rainfall station" ,\x0A    "lat" : 52.73152 ,\x0A    "long" : -0.995167 ,\x0A    "measures" : [ { \x0A      "@id" : "http://environment.data.gov.uk/flood-monitoring/id/measures/3680-rainfall-tipping_bucket_raingauge-t-15_min-mm" ,\x0A      "label" : "rainfall-tipping_bucket_raingauge-t-15_min-mm" ,\x0A      "latestReading" : { \x0A        "@id" : "http://environment.data.gov.uk/flood-monitoring/data/readings/3680-rainfall-tipping_bucket_raingauge-t-15_min-mm/2018-07-05T18-00-00Z" ,\x0A        "date" : "2018-07-05" ,\x0A        "dateTime" : "2018-07-05T18:00:00Z" ,\x0A        "measure" : "http://environment.data.gov.uk/flood-monitoring/id/measures/3680-rainfall-tipping_bucket_raingauge-t-15_min-mm" ,\x0A        "value" : 0.0\x0A      }\x0A       ,\x0A      "notation" : "3680-rainfall-tipping_bucket_raingauge-t-15_min-mm" ,\x0A      "parameter" : "rainfall" ,\x0A      "parameterName" : "Rainfall" ,\x0A      "period" : 900 ,\x0A      "qualifier" : "Tipping Bucket Raingauge" ,\x0A      "station" : "http://environment.data.gov.uk/flood-monitoring/id/stations/3680" ,\x0A      "stationReference" : "3680" ,\x0A      "type" : [ "http://environment.data.gov.uk/flood-monitoring/def/core/Measure", "http://environment.data.gov.uk/flood-monitoring/def/core/Rainfall" ] ,\x0A      "unit" : "http://qudt.org/1.1/vocab/unit#Millimeter" ,\x0A      "unitName" : "mm" ,\x0A      "valueType" : "total"\x0A    }\x0A    , { \x0A      "@id" : "http://environment.data.gov.uk/flood-monitoring/id/measures/3680-temperature-dry_bulb-i-15_min-deg_C" ,\x0A      "label" : "temperature-dry_bulb-i-15_min-deg_C" ,\x0A      "latestReading" : { \x0A        "@id" : "http://environment.data.gov.uk/flood-monitoring/data/readings/3680-temperature-dry_bulb-i-15_min-deg_C/2018-07-05T18-00-00Z" ,\x0A        "date" : "2018-07-05" ,\x0A        "dateTime" : "2018-07-05T18:00:00Z" ,\x0A        "measure" : "http://environment.data.gov.uk/flood-monitoring/id/measures/3680-temperature-dry_bulb-i-15_min-deg_C" ,\x0A        "value" : 26.9\x0A      }\x0A       ,\x0A      "notation" : "3680-temperature-dry_bulb-i-15_min-deg_C" ,\x0A      "parameter" : "temperature" ,\x0A      "parameterName" : "Temperature" ,\x0A      "period" : 900 ,\x0A      "qualifier" : "Dry Bulb" ,\x0A      "station" : "http://environment.data.gov.uk/flood-monitoring/id/stations/3680" ,\x0A      "stationReference" : "3680" ,\x0A      "type" : [ "http://environment.data.gov.uk/flood-monitoring/def/core/Measure", "http://environment.data.gov.uk/flood-monitoring/def/core/Temperature" ] ,\x0A      "unit" : "http://qudt.org/1.1/vocab/unit#DegreeCentigrade" ,\x0A      "unitName" : "deg C" ,\x0A      "valueType" : "instantaneous"\x0A    }\x0A    , { \x0A      "@id" : "http://environment.data.gov.uk/flood-monitoring/id/measures/3680-temperature-dry_bulb-i-1_h-deg_C" ,\x0A      "label" : "Brooksby RAIN - temperature-dry_bulb-i-1_h-deg_C" ,\x0A      "notation" : "3680-temperature-dry_bulb-i-1_h-deg_C" ,\x0A      "parameter" : "temperature" ,\x0A      "parameterName" : "Temperature" ,\x0A      "period" : 3600 ,\x0A      "qualifier" : "Dry Bulb" ,\x0A      "station" : "http://environment.data.gov.uk/flood-monitoring/id/stations/3680" ,\x0A      "stationReference" : "3680" ,\x0A      "type" : [ "http://environment.data.gov.uk/flood-monitoring/def/core/Measure", "http://environment.data.gov.uk/flood-monitoring/def/core/Temperature" ] ,\x0A      "unit" : "http://qudt.org/1.1/vocab/unit#DegreeCentigrade" ,\x0A      "unitName" : "deg C" ,\x0A      "valueType" : "instantaneous"\x0A    }\x0A     ] ,\x0A    "northing" : 315350 ,\x0A    "notation" : "3680" ,\x0A    "stationReference" : "3680" ,\x0A    "type" : "http://environment.data.gov.uk/flood-monitoring/def/core/Station"\x0A  }\x0A}\x0A\x0A

Is there a way to write valid JSON to the Kafka topic? For example, replace \x0A with a normal space character?

Running in distributed mode

Hi @llofberg

I am trying to run the connector in distributed mode , will this framework handle that for us in default or do we need to handle or implement anything inside the poll method ? From what i can see is when running in distributed mode with two tasks i can see my data is getting duplicated means each task is calling the URL .

Thanks,
Justin Joseph

compilation error

Hello,
I'm trying to compile the project and I get the following error.
I think it could be the java version that I use to compile

What version should be used to compile the project? 1.7? 1.8?

could you distribute a new release with the compiled connector?

Thank you !

[ERROR] COMPILATION ERROR :
[INFO] -------------------------------------------------------------
[ERROR] Failure executing javac, but could not parse the error:
javac: invalid target release: 1.8
Usage: javac <options> <source files>
use -help for a list of possible options

[INFO] 1 error
[INFO] -------------------------------------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 1.370 s
[INFO] Finished at: 2018-03-22T12:30:15+01:00
[INFO] Final Memory: 10M/229M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:2.5.1:compile (default-compile) on project kafka-connect-rest: Compilation failure
[ERROR] Failure executing javac, but could not parse the error:
[ERROR] javac: invalid target release: 1.8
[ERROR] Usage: javac <options> <source files>
[ERROR] use -help for a list of possible options

edit:

finally I have been able to compile correctly configuring java_home to java 1.8
It would be nice to define it in the documentation

kafka-connect-rest and confluent-5.0.0

Hello,
I would like to use kafka-connect-rest with confluent-5.0.0
After the following steps:
1- git clone https://github.com/llofberg/kafka-connect-rest.git
2- modifying pom.xml to have
<kafka.version>2.0.0</kafka.version>
<confluent.version>5.0.0</confluent.version>
3- mvn clean install
I got:
[ERROR] COMPILATION ERROR :
[INFO] -------------------------------------------------------------
[ERROR] /home/dupond/confluent-5.0.0/plugin/kafka-connect-rest/src/test/java/com/tm/kafka/connect/rest/RestTaskTest.java:[89,26] error: incompatible types: SourceTaskContext is not a functional interface
[ERROR] multiple non-overriding abstract methods found in interface SourceTaskContext
/home/dupond/confluent-5.0.0/plugin/kafka-connect-rest/src/test/java/com/tm/kafka/connect/rest/RestTaskTest.java:[105,26] error: incompatible types: SourceTaskContext is not a functional interface
[ERROR] multiple non-overriding abstract methods found in interface SourceTaskContext
/home/dupond/confluent-5.0.0/plugin/kafka-connect-rest/src/test/java/com/tm/kafka/connect/rest/RestTaskTest.java:[207,19] error: MockSinkTaskContext is not abstract and does not override abstract method configs() in SinkTaskContext
[INFO] 3 errors

configuration: Maven 3.5.4, Confluent 5.0.0, Debian 9, Java 1.8

Are there other basic modifications I could do to have this connector working?
Thank you
Martin

Question about pulling data from https endpoint

I am trying to retreive data from a https endpoint and see the following exception:

ERROR HTTP call execution failed null (com.tm.kafka.connect.rest.RestSourceTask)
org.apache.kafka.connect.errors.RetriableException
	at com.tm.kafka.connect.rest.http.executor.OkHttpRequestExecutor.execute(OkHttpRequestExecutor.java:67)
	at com.tm.kafka.connect.rest.RestSourceTask.poll(RestSourceTask.java:89)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:265)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
	at okio.RealBufferedSource.require(RealBufferedSource.java:65)
	at okio.GzipSource.consumeHeader(GzipSource.java:114)
	at okio.GzipSource.read(GzipSource.java:73)
	at okio.RealBufferedSource.select(RealBufferedSource.java:100)
	at okhttp3.internal.Util.bomAwareCharset(Util.java:467)
	at okhttp3.ResponseBody.string(ResponseBody.java:181)
	at com.tm.kafka.connect.rest.http.executor.OkHttpRequestExecutor.execute(OkHttpRequestExecutor.java:64)

I am wondering if https rest calls are supported?

Sending data from kafka topic to multiple endpoint URLs

hey Lenny,
I finally got this connector up and running. My problem before was the mis/match kafka version. I used 1.0.0 instead of 1.1.0. So upgrading to 1.1.0 solved my problem.
Could this connector send data to multiple endpoint URLs, or it can only send data to one URL?

URL parameters in source connector don't seem to work

private String createUrl(String url, String payload) throws UnsupportedEncodingException {
if (payload == null || payload.trim().isEmpty()) {
return url;
}
String format;
if (url.endsWith("?")) {
format = "%s&%s";
} else {
format = "%s?%s";
}
return String.format(format, url, URLEncoder.encode(payload, "UTF-8"));
}

This code takes the payload string and URL Encodes it (line 85). However, if the string was of the form a=b then it will be encoded to a%3Db - but this is no longer a valid parameter of the form key=value.

The simple solution would be to avoid URL Encoding, but this would throw the responsibility for encoding parameter values onto the user (person creating the config) which may be undesirable.

There is a deeper issue here as well. The current code only supports URL parameters on GET requests, but I can't see why they wouldn't be valid on any request (eg POST).

{id=RestSinkConnector-0} Task is being killed and will not recover until manually restarted

Hi @llofberg ,

I'm trying to use your sink connector but I'm facing this issue and I'm unable to find out what is the issue:

echo "===> Launching ... "
+ echo '===> Launching ... '
exec /etc/confluent/docker/launch
+ exec /etc/confluent/docker/launch
[2018-06-14 14:21:28,150] INFO Loading plugin from: /jars/kafka-connect-rest-1.0-SNAPSHOT-shaded.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2018-06-14 14:21:28,151] DEBUG Loading plugin urls: [file:/jars/kafka-connect-rest-1.0-SNAPSHOT-shaded.jar] (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
file:/jars/kafka-connect-rest-1.0-SNAPSHOT-shaded.jar (org.reflections.Reflections)
[2018-06-14 14:21:28,200] DEBUG [Thread[org.reflections-scanner-0,5,main]] scanning file:/jars/kafka-connect-rest-1.0-SNAPSHOT-shaded.jar (org.reflections.Reflections)
[2018-06-14 14:21:28,253] DEBUG could not scan file META-INF/MANIFEST.MF in url file:/jars/kafka-connect-rest-1.0-SNAPSHOT-shaded.jar with scanner SubTypesScanner (org.reflections.Reflections)
[2018-06-14 14:21:28,277] DEBUG could not scan file META-INF/maven/com.tm.kafka/kafka-connect-rest/pom.xml in url file:/jars/kafka-connect-rest-1.0-SNAPSHOT-shaded.jar with scanner SubTypesScanner (org.reflections.Reflections)
org.reflections.ReflectionsException: could not create class object from file META-INF/maven/com.tm.kafka/kafka-connect-rest/pom.xml
[2018-06-14 14:21:28,277] DEBUG could not scan file META-INF/maven/com.tm.kafka/kafka-connect-rest/pom.properties in url file:/jars/kafka-connect-rest-1.0-SNAPSHOT-shaded.jar with scanner SubTypesScanner (org.reflections.Reflections)
org.reflections.ReflectionsException: could not create class object from file META-INF/maven/com.tm.kafka/kafka-connect-rest/pom.properties
[2018-06-14 14:21:28,283] DEBUG could not scan file META-INF/maven/org.slf4j/slf4j-api/pom.xml in url file:/jars/kafka-connect-rest-1.0-SNAPSHOT-shaded.jar with scanner SubTypesScanner (org.reflections.Reflections)
[2018-06-14 14:21:28,284] DEBUG could not scan file META-INF/maven/org.slf4j/slf4j-api/pom.properties in url file:/jars/kafka-connect-rest-1.0-SNAPSHOT-shaded.jar with scanner SubTypesScanner (org.reflections.Reflections)
[2018-06-14 14:21:28,284] DEBUG could not scan file META-INF/NOTICE.txt in url file:/jars/kafka-connect-rest-1.0-SNAPSHOT-shaded.jar with scanner SubTypesScanner (org.reflections.Reflections)
[2018-06-14 14:21:28,289] DEBUG could not scan file META-INF/LICENSE.txt in url file:/jars/kafka-connect-rest-1.0-SNAPSHOT-shaded.jar with scanner SubTypesScanner (org.reflections.Reflections)
[2018-06-14 14:21:28,311] DEBUG could not scan file META-INF/maven/org.apache.commons/commons-compress/pom.xml in url file:/jars/kafka-connect-rest-1.0-SNAPSHOT-shaded.jar with scanner SubTypesScanner (org.reflections.Reflections)
[2018-06-14 14:21:28,331] DEBUG could not scan file META-INF/maven/org.apache.commons/commons-compress/pom.properties in url file:/jars/kafka-connect-rest-1.0-SNAPSHOT-shaded.jar with scanner SubTypesScanner (org.reflections.Reflections)
[2018-06-14 14:21:28,334] DEBUG could not scan file META-INF/LICENSE in url file:/jars/kafka-connect-rest-1.0-SNAPSHOT-shaded.jar with scanner SubTypesScanner (org.reflections.Reflections)
[2018-06-14 14:21:28,335] DEBUG could not scan file META-INF/NOTICE in url file:/jars/kafka-connect-rest-1.0-SNAPSHOT-shaded.jar with scanner SubTypesScanner (org.reflections.Reflections)
[2018-06-14 14:21:28,347] DEBUG could not scan file org/apache/velocity/runtime/defaults/velocity.properties in url file:/jars/kafka-connect-rest-1.0-SNAPSHOT-shaded.jar with scanner SubTypesScanner (org.reflections.Reflections)
[2018-06-14 14:21:28,351] DEBUG could not scan file org/apache/velocity/runtime/defaults/directive.properties in url file:/jars/kafka-connect-rest-1.0-SNAPSHOT-shaded.jar with scanner SubTypesScanner (org.reflections.Reflections)
[2018-06-14 14:21:28,364] DEBUG could not scan file META-INF/DEPENDENCIES in url file:/jars/kafka-connect-rest-1.0-SNAPSHOT-shaded.jar with scanner SubTypesScanner (org.reflections.Reflections)
[2018-06-14 14:21:28,366] DEBUG could not scan file META-INF/maven/org.apache.velocity/velocity-engine-core/pom.xml in url file:/jars/kafka-connect-rest-1.0-SNAPSHOT-shaded.jar with scanner SubTypesScanner (org.reflections.Reflections)
[2018-06-14 14:21:28,371] DEBUG could not scan file META-INF/maven/org.apache.velocity/velocity-engine-core/pom.properties in url file:/jars/kafka-connect-rest-1.0-SNAPSHOT-shaded.jar with scanner SubTypesScanner (org.reflections.Reflections)
[2018-06-14 14:21:28,373] DEBUG could not scan file META-INF/maven/commons-io/commons-io/pom.xml in url file:/jars/kafka-connect-rest-1.0-SNAPSHOT-shaded.jar with scanner SubTypesScanner (org.reflections.Reflections)
[2018-06-14 14:21:28,373] DEBUG could not scan file META-INF/maven/commons-io/commons-io/pom.properties in url file:/jars/kafka-connect-rest-1.0-SNAPSHOT-shaded.jar with scanner SubTypesScanner (org.reflections.Reflections)
[2018-06-14 14:21:28,380] DEBUG could not scan file META-INF/maven/org.apache.commons/commons-lang3/pom.properties in url file:/jars/kafka-connect-rest-1.0-SNAPSHOT-shaded.jar with scanner SubTypesScanner (org.reflections.Reflections)
[2018-06-14 14:21:28,400] DEBUG could not scan file META-INF/maven/org.apache.commons/commons-lang3/pom.xml in url file:/jars/kafka-connect-rest-1.0-SNAPSHOT-shaded.jar with scanner SubTypesScanner (org.reflections.Reflections)
[2018-06-14 14:21:28,414] WARN could not get type for name com.tm.kafka.connect.rest.converter.PayloadToSourceRecordConverter from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name com.tm.kafka.connect.rest.converter.PayloadToSourceRecordConverter
Caused by: java.lang.ClassNotFoundException: com.tm.kafka.connect.rest.converter.PayloadToSourceRecordConverter
[2018-06-14 14:21:28,420] WARN could not get type for name com.tm.kafka.connect.rest.converter.SinkRecordToPayloadConverter from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name com.tm.kafka.connect.rest.converter.SinkRecordToPayloadConverter
Caused by: java.lang.ClassNotFoundException: com.tm.kafka.connect.rest.converter.SinkRecordToPayloadConverter
[2018-06-14 14:21:28,429] WARN could not get type for name com.tm.kafka.connect.rest.selector.TopicSelector from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name com.tm.kafka.connect.rest.selector.TopicSelector
Caused by: java.lang.ClassNotFoundException: com.tm.kafka.connect.rest.selector.TopicSelector
[2018-06-14 14:21:28,460] INFO Registered loader: PluginClassLoader{pluginLocation=file:/jars/kafka-connect-rest-1.0-SNAPSHOT-shaded.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2018-06-14 14:21:28,460] INFO Added plugin 'com.tm.kafka.connect.rest.RestSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2018-06-14 14:21:28,461] INFO Added plugin 'com.tm.kafka.connect.rest.RestSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2018-06-14 14:21:28,470] INFO Loading plugin from: /jars/kafka-connect-rest-1.0-SNAPSHOT.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2018-06-14 14:21:28,471] DEBUG Loading plugin urls: [file:/jars/kafka-connect-rest-1.0-SNAPSHOT.jar] (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
file:/jars/kafka-connect-rest-1.0-SNAPSHOT.jar (org.reflections.Reflections)
[2018-06-14 14:21:28,471] DEBUG [Thread[org.reflections-scanner-0,5,main]] scanning file:/jars/kafka-connect-rest-1.0-SNAPSHOT.jar (org.reflections.Reflections)
[2018-06-14 14:21:28,471] DEBUG could not scan file META-INF/MANIFEST.MF in url file:/jars/kafka-connect-rest-1.0-SNAPSHOT.jar with scanner SubTypesScanner (org.reflections.Reflections)
[2018-06-14 14:21:28,474] DEBUG could not scan file META-INF/maven/com.tm.kafka/kafka-connect-rest/pom.xml in url file:/jars/kafka-connect-rest-1.0-SNAPSHOT.jar with scanner SubTypesScanner (org.reflections.Reflections)
org.reflections.ReflectionsException: could not create class object from file META-INF/maven/com.tm.kafka/kafka-connect-rest/pom.xml
[2018-06-14 14:21:28,474] DEBUG could not scan file META-INF/maven/com.tm.kafka/kafka-connect-rest/pom.properties in url file:/jars/kafka-connect-rest-1.0-SNAPSHOT.jar with scanner SubTypesScanner (org.reflections.Reflections)
org.reflections.ReflectionsException: could not create class object from file META-INF/maven/com.tm.kafka/kafka-connect-rest/pom.properties
[2018-06-14 14:21:28,475] WARN could not get type for name com.tm.kafka.connect.rest.selector.TopicSelector from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name com.tm.kafka.connect.rest.selector.TopicSelector
Caused by: java.lang.ClassNotFoundException: com.tm.kafka.connect.rest.selector.TopicSelector
[2018-06-14 14:21:28,475] WARN could not get type for name com.tm.kafka.connect.rest.converter.SinkRecordToPayloadConverter from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name com.tm.kafka.connect.rest.converter.SinkRecordToPayloadConverter
Caused by: java.lang.ClassNotFoundException: com.tm.kafka.connect.rest.converter.SinkRecordToPayloadConverter
[2018-06-14 14:21:28,475] WARN could not get type for name com.tm.kafka.connect.rest.converter.PayloadToSourceRecordConverter from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name com.tm.kafka.connect.rest.converter.PayloadToSourceRecordConverter
Caused by: java.lang.ClassNotFoundException: com.tm.kafka.connect.rest.converter.PayloadToSourceRecordConverter
[2018-06-14 14:21:28,483] INFO Registered loader: PluginClassLoader{pluginLocation=file:/jars/kafka-connect-rest-1.0-SNAPSHOT.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2018-06-14 14:21:28,493] WARN could not get type for name com.tm.kafka.connect.rest.selector.TopicSelector from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name com.tm.kafka.connect.rest.selector.TopicSelector
Caused by: java.lang.ClassNotFoundException: com.tm.kafka.connect.rest.selector.TopicSelector
[2018-06-14 14:21:28,493] WARN could not get type for name com.tm.kafka.connect.rest.converter.SinkRecordToPayloadConverter from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name com.tm.kafka.connect.rest.converter.SinkRecordToPayloadConverter
Caused by: java.lang.ClassNotFoundException: com.tm.kafka.connect.rest.converter.SinkRecordToPayloadConverter
[2018-06-14 14:21:28,493] WARN could not get type for name com.tm.kafka.connect.rest.converter.PayloadToSourceRecordConverter from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name com.tm.kafka.connect.rest.converter.PayloadToSourceRecordConverter
Caused by: java.lang.ClassNotFoundException: com.tm.kafka.connect.rest.converter.PayloadToSourceRecordConverter
        rest.advertised.host.name = kafka-connect
        rest.advertised.listener = null
        rest.advertised.port = null
        rest.host.name = kafka-connect
        rest.port = 8083
[2018-06-14 14:21:30,375] WARN The configuration 'rest.advertised.host.name' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2018-06-14 14:21:30,375] WARN The configuration 'rest.host.name' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2018-06-14 14:21:30,375] WARN The configuration 'rest.port' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2018-06-14 14:21:30,594] INFO Added connector for http://kafka-connect:8083 (org.apache.kafka.connect.runtime.rest.RestServer)
[2018-06-14 14:21:30,699] INFO Advertised URI: http://kafka-connect:8083/ (org.apache.kafka.connect.runtime.rest.RestServer)
[2018-06-14 14:21:30,866] INFO Starting REST server (org.apache.kafka.connect.runtime.rest.RestServer)
[2018-06-14 14:21:30,869] WARN The configuration 'rest.advertised.host.name' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2018-06-14 14:21:30,869] WARN The configuration 'rest.host.name' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2018-06-14 14:21:30,869] WARN The configuration 'rest.port' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2018-06-14 14:21:31,001] WARN The configuration 'rest.advertised.host.name' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2018-06-14 14:21:31,001] WARN The configuration 'rest.host.name' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2018-06-14 14:21:31,001] WARN The configuration 'rest.port' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2018-06-14 14:21:31,021] WARN The configuration 'rest.advertised.host.name' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2018-06-14 14:21:31,021] WARN The configuration 'rest.host.name' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2018-06-14 14:21:31,021] WARN The configuration 'rest.port' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2018-06-14 14:21:31,047] WARN The configuration 'rest.advertised.host.name' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2018-06-14 14:21:31,047] WARN The configuration 'rest.host.name' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2018-06-14 14:21:31,047] WARN The configuration 'rest.port' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2018-06-14 14:21:31,161] WARN The configuration 'rest.advertised.host.name' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2018-06-14 14:21:31,161] WARN The configuration 'rest.host.name' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2018-06-14 14:21:31,161] WARN The configuration 'rest.port' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2018-06-14 14:21:31,164] WARN The configuration 'rest.advertised.host.name' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2018-06-14 14:21:31,164] WARN The configuration 'rest.host.name' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2018-06-14 14:21:31,164] WARN The configuration 'rest.port' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2018-06-14 14:21:31,222] WARN The configuration 'rest.advertised.host.name' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2018-06-14 14:21:31,222] WARN The configuration 'rest.host.name' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2018-06-14 14:21:31,222] WARN The configuration 'rest.port' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
Jun 14, 2018 2:21:31 PM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.

[2018-06-14 14:21:31,311] INFO Advertised URI: http://kafka-connect:8083/ (org.apache.kafka.connect.runtime.rest.RestServer)
[2018-06-14 14:21:31,311] INFO REST server listening at http://kafka-connect:8083/, advertising URL http://kafka-connect:8083/ (org.apache.kafka.connect.runtime.rest.RestServer)
[2018-06-14 14:21:31,331] WARN The configuration 'rest.advertised.host.name' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2018-06-14 14:21:31,331] WARN The configuration 'rest.host.name' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2018-06-14 14:21:31,331] WARN The configuration 'rest.port' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2018-06-14 14:21:31,333] WARN The configuration 'rest.advertised.host.name' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2018-06-14 14:21:31,333] WARN The configuration 'rest.host.name' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2018-06-14 14:21:31,333] WARN The configuration 'rest.port' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2018-06-14 14:21:31,347] DEBUG Updating configuration for connector RestSinkConnector configuration: {connector.class=com.tm.kafka.connect.rest.RestSinkConnector, res .sink.method=POST, rest.sink.payload.converter.schema=true, tasks.max=1, topics=wimt_collaudo, rest.sink.properties=Content-Type:application/json, name=RestSinkConnector, rest.sink.url=http://webservice:8080/count, rest.sink.payload.converter.class=com.tm.kafka.connect.rest.converter.JsonPayloadConverter} (org.apache.kafka.connect.storage.KafkaConfigBackingStore)
[2018-06-14 14:21:31,347] DEBUG Storing new config for task RestSinkConnector-0 this will wait for a commit message before the new config will take effect. New config: {connector.class=com.tm.kafka.connect.rest.RestSinkConnector, task.class=com.tm.kafka.connect.rest.RestSinkTask, rest.sink.method=POST, rest.sink.payload.converter.schema=true, tasks.max=1, topics=wimt_collaudo, rest.sink.properties=Content-Type:application/json, name=RestSinkConnector, rest.sink.url=http://webservice:8080/count, res .sink.payload.converter.class=com.tm.kafka.connect.rest.converter.JsonPayloadConverter} (org.apache.kafka.connect.storage.KafkaConfigBackingStore)
        connector.class = com.tm.kafka.connect.rest.RestSinkConnector
        connector.class = com.tm.kafka.connect.rest.RestSinkConnector
        connector.class = com.tm.kafka.connect.rest.RestSinkConnector
        connector.class = com.tm.kafka.connect.rest.RestSinkConnector
[2018-06-14 14:21:34,859] DEBUG Getting plugin class loader for connector: 'com.tm.kafka.connect.rest.RestSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2018-06-14 14:21:34,859] INFO Creating connector RestSinkConnector of type com.tm.kafka.connect.rest.RestSinkConnector (org.apache.kafka.connect.runtime.Worker)
        task.class = class com.tm.kafka.connect.rest.RestSinkTask
[2018-06-14 14:21:34,860] INFO Instantiated task RestSinkConnector-0 with version 1.0-SNAPSHOT of type com.tm.kafka.connect.rest.RestSinkTask (org.apache.kafka.connect.runtime.Worker)
[2018-06-14 14:21:34,862] INFO Instantiated connector RestSinkConnector with version 1.0-SNAPSHOT of type class com.tm.kafka.connect.rest.RestSinkConnector (org.apache.kafka.connect.runtime.Worker)
[2018-06-14 14:21:34,862] DEBUG Getting plugin class loader for connector: 'com.tm.kafka.connect.rest.RestSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2018-06-14 14:21:34,862] DEBUG WorkerConnector{id=RestSinkConnector} Initializing connector RestSinkConnector with config {connector.class=com.tm.kafka.connect.rest.RestSinkConnector, rest.sink.method=POST, rest.sink.payload.converter.schema=true, tasks.max=1, topics=wimt_collaudo, rest.sink.properties=Content-Type:application/json, name=RestSinkConnector, rest.sink.url=http://webservice:8080/count, rest.sink.payload.converter.class=com.tm.kafka.connect.rest.converter.JsonPayloadConverter} (org.apache.kafka.connect.runtime.WorkerConnector)
        rest.sink.method = POST
        rest.sink.payload.converter.class = class com.tm.kafka.connect.rest.converter.JsonPayloadConverter
        rest.sink.payload.converter.schema = true
        rest.sink.properties = [Content-Type:application/json]
        rest.sink.retry.backoff.ms = 5000
        rest.sink.url = http://webservice:8080/count
        rest.sink.velocity.template = rest.vm
 (com.tm.kafka.connect.rest.RestSinkConnectorConfig)
[2018-06-14 14:21:34,890] DEBUG Getting plugin class loader for connector: 'com.tm.kafka.connect.rest.RestSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
        connector.class = com.tm.kafka.connect.rest.RestSinkConnector
        connector.class = com.tm.kafka.connect.rest.RestSinkConnector
[2018-06-14 14:21:34,890] DEBUG Getting plugin class loader for connector: 'com.tm.kafka.connect.rest.RestSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
        rest.sink.method = POST
        rest.sink.payload.converter.class = class com.tm.kafka.connect.rest.converter.JsonPayloadConverter
        rest.sink.payload.converter.schema = true
        rest.sink.properties = [Content-Type:application/json]
        rest.sink.retry.backoff.ms = 5000
        rest.sink.url = http://webservice:8080/count
        rest.sink.velocity.template = rest.vm
 (com.tm.kafka.connect.rest.RestSinkConnectorConfig)
[2018-06-14 14:21:37,961] ERROR WorkerSinkTask{id=RestSinkConnector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
[2018-06-14 14:21:37,961] DEBUG Stopping sink task, setting client to null (com.tm.kafka.connect.rest.RestSinkTask)

Could you please help me understanding what's wrong in my configuration?

Many thanks,

Diego

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.