Giter Site home page Giter Site logo

castorm / kafka-connect-http Goto Github PK

View Code? Open in Web Editor NEW
119.0 8.0 52.0 842 KB

Kafka Connect connector that enables Change Data Capture from JSON/HTTP APIs into Kafka.

Home Page: https://castorm.github.io/kafka-connect-http/

License: Apache License 2.0

Java 100.00%
kafka kafka-connect connectors http rest source connector cdc change-data-capture api

kafka-connect-http's People

Contributors

akshatjindal1 avatar castorm avatar codacy-badger avatar dependabot-preview[bot] avatar dependabot[bot] avatar hackedd avatar lzuchowska avatar poulinjulien avatar ueisele avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-connect-http's Issues

Maven build failed

Describe the bug
mvn clean install failed

To Reproduce
simply run : mvn clean install

Expected behavior
success but failed, mvn compile succeed.

Screenshots
INFO] Building zip: /Users/che/develop/kafka-connect-http/kafka-connect-http/target/components/packages/castorm-kafka-connect-http-0.8.12-SNAPSHOT.zip
[INFO]
[INFO] --- jar:3.3.0:jar (default-jar) @ kafka-connect-http ---
[INFO] Building jar: /Users/che/develop/kafka-connect-http/kafka-connect-http/target/kafka-connect-http-0.8.12-SNAPSHOT.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 3.894 s
[INFO] Finished at: 2023-09-14T15:34:44-07:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-jar-plugin:3.3.0:jar (default-jar) on project kafka-connect-http: You have to use a classifier to attach supplemental artifacts to the project instead of replacing them. -> [Help 1]
org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.apache.maven.plugins:maven-jar-plugin:3.3.0:jar (default-jar) on project kafka-connect-http: You have to use a classifier to attach supplemental artifacts to the project instead of replacing them.
at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute2 (MojoExecutor.java:333)
at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute (MojoExecutor.java:316)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:212)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:174)
at org.apache.maven.lifecycle.internal.MojoExecutor.access$000 (MojoExecutor.java:75)
at org.apache.maven.lifecycle.internal.MojoExecutor$1.run (MojoExecutor.java:162)
at org.apache.maven.plugin.DefaultMojosExecutionStrategy.execute (DefaultMojosExecutionStrategy.java:39)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:159)
at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:105)
at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:73)
at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:53)
at org.apache.maven.lifecycle.internal.LifecycleStarter.execute (LifecycleStarter.java:118)
at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:261)
at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:173)
at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:101)
at org.apache.maven.cli.MavenCli.execute (MavenCli.java:906)
at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:283)
at org.apache.maven.cli.MavenCli.main (MavenCli.java:206)
at jdk.internal.reflect.DirectMethodHandleAccessor.invoke (DirectMethodHandleAccessor.java:104)
at java.lang.reflect.Method.invoke (Method.java:578)
at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced (Launcher.java:283)
at org.codehaus.plexus.classworlds.launcher.Launcher.launch (Launcher.java:226)
at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode (Launcher.java:407)
at org.codehaus.plexus.classworlds.launcher.Launcher.main (Launcher.java:348)
Caused by: org.apache.maven.plugin.MojoExecutionException: You have to use a classifier to attach supplemental artifacts to the project instead of replacing them.
at org.apache.maven.plugins.jar.AbstractJarMojo.execute (AbstractJarMojo.java:315)
at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo (DefaultBuildPluginManager.java:126)
at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute2 (MojoExecutor.java:328)
at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute (MojoExecutor.java:316)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:212)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:174)
at org.apache.maven.lifecycle.internal.MojoExecutor.access$000 (MojoExecutor.java:75)
at org.apache.maven.lifecycle.internal.MojoExecutor$1.run (MojoExecutor.java:162)
at org.apache.maven.plugin.DefaultMojosExecutionStrategy.execute (DefaultMojosExecutionStrategy.java:39)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:159)
at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:105)
at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:73)
at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:53)
at org.apache.maven.lifecycle.internal.LifecycleStarter.execute (LifecycleStarter.java:118)
at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:261)
at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:173)
at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:101)
at org.apache.maven.cli.MavenCli.execute (MavenCli.java:906)
at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:283)
at org.apache.maven.cli.MavenCli.main (MavenCli.java:206)
at jdk.internal.reflect.DirectMethodHandleAccessor.invoke (DirectMethodHandleAccessor.java:104)
at java.lang.reflect.Method.invoke (Method.java:578)
at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced (Launcher.java:283)
at org.codehaus.plexus.classworlds.launcher.Launcher.launch (Launcher.java:226)
at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode (Launcher.java:407)
at org.codehaus.plexus.classworlds.launcher.Launcher.main (Launcher.java:348)
[ERROR]
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
Kafka Connect:

  • Version [e.g. 22]

Plugin:

  • Version [e.g. 22]

Additional context
Add any other context about the problem here.

Support for Microsoft's JSON date formats

I am working with an Odata API interface which presents timestamp values such as lastModified in a format similar to `//Date(1530144000000+0530)/. Would it be possible to get such date formats supported. If this is something that could be supported I'd be happy to contribute this sort of functionality, a steer in the right direction would certainly be appreciated.

Handle records in reverse chronological order

Problem

Some APIs don't offer control over the ordering of results.

This might be especially common on APIs that offer a snapshot of the N most recent records.

The source connector bases its offset advancement on chronologically ordered results, so latest committed offset would be that of the oldest record instead of the newest. Ignoring this would result in undesired consequences:

  • when offset is used in the query, offset would advance one record at a time and it would require as many queries as records in the response.
  • inability to deduplicate, which would only make the problem above worst.

Potential Solutions

  • automatically sorting records in chronological order, this would be the simplest from user perspective as it doest require even knowing of this concern, however, it would penalize performance, especially when records are reversed
  • allowing the user to specify the sorting direction of records in the response.

Is it possible to deal with pagination?

Hello,
I started to extract data from different APIs and I ran into an endpoint that use pagination in this way:

JSON response:

{
  "success":true,
  "data":[
    {},
    {},
    ...
  ],
  "additional_data":{
    "pagination":{
      "start":0,
      "limit":100,
      "more_items_in_collection":true,
      "next_start":100
    }
  }
}

The endpoint allows to set the 'start' and the 'limit' in the GET request, but I can't set these elements as the offset in the connector conf because my list pointer is 'data'. Is there a way to deal with pagination like this? If not, I'll be grateful if there are some suggestions to implement this feature.

Thanks.

Broken link: The "See examples" link on https://castorm.github.io/kafka-connect-http/ is broken

Describe the bug
On https://castorm.github.io/kafka-connect-http, the example section says

Examples

See [examples](https://castorm.github.io/kafka-connect-http/examples), e.g.

Clicking on https://castorm.github.io/kafka-connect-http/examples brings me to a 404.

To Reproduce
On https://castorm.github.io/kafka-connect-http, click on the word "examples".

Expected behavior
I expected that it would bring me to a top-level examples page.

Screenshots
click
404

Additional context
Documentation broken link.

Add support for unordered results

Is your feature request related to a problem? Please describe.
I haven't seen an option to order the results when the API returns. I am working with an API where the results are not ordered and any option in the http.response.list.order.direction config allows it to be correctly sorted.

Describe the solution you'd like
Maybe another option like ASC_FORCED_BY_TIMESTAMP or even/either ASC_FORCED_BY_KEY would be added to sort the results based on the property.

Describe alternatives you've considered
I have not found any alternatives yet.

Additional context
I saw your answer for issue #126, maybe you do not see fit for this in the project. I have already worked on a solution. I will add a pull request very soon.

Http source or sink not taking HTTP PROXY define in system level in REDHAT 8

Is your feature request related to a problem? Please describe.
IN staging with open internet this configuration is working but in pre-prod with limited internet with http_proxy IP/Port defined
its not taking it. due to this its not able to connect to the right destination/source.
Note: With cURL command i'm able to get he expected response but with http source/sink connector getting below error:
java.net.SocketTimeoutException: connect time out

Describe the solution you'd like
Could be add below configuration
#################################
http.proxy.host=localhost
http.proxy.port=3128
http.proxy.user=proxyuser
http.proxy.password=proxypassword
###################################

Describe alternatives you've considered
With cURL command its working

http_source_error

[DepShield] (CVSS 5.5) Vulnerability due to usage of junit:junit:4.12

Vulnerabilities

DepShield reports that this application's usage of junit:junit:4.12 results in the following vulnerability(s):


Occurrences

junit:junit:4.12 is a transitive dependency introduced by the following direct dependency(s):

org.testcontainers:testcontainers:1.15.1
        └─ junit:junit:4.12

com.github.castorm:kafka-connect-http-infra:0.8.9-SNAPSHOT
        └─ org.testcontainers:testcontainers:1.15.1
              └─ junit:junit:4.12

This is an automated GitHub Issue created by Sonatype DepShield. Details on managing GitHub Apps, including DepShield, are available for personal and organization accounts. Please submit questions or feedback about DepShield to the Sonatype DepShield Community.

Unable to add kafka-connect-http as a Gradle dependency

I am unable to add kafka-connect-http as a Gradle dependency as follows:

dependencies {
    implementation(
            [group: "com.github.castorm", name: "kafka-connect-http", version: kafka_connect_http_version]
    )
}

The problem appears to be that kafka-connect-http-0.8.9.pom refers to its parent project using a relative path (../pom.xml) which is not available in the Maven Central repository:

<parent>
    <artifactId>kafka-connect-http-parent</artifactId>
    <groupId>com.github.castorm</groupId>
    <version>0.8.9</version>
    <relativePath>../pom.xml</relativePath>
</parent>

The solution is (probably?) to publish the parent POM along with the rest of the artifacts, assuming there's nothing "secret" in it.
Until/unless this is done, the only way to express this dependency is to include the kafka-connect-http.jar in the project and depend on it as a file, which is not ideal for obvious reasons. It does not appear to be possible to avoid this by excluding transitive dependencies.
The raw build output follows.

$ ./gradlew clean build
> Task :modules:lib-kafka-plugins:compileTestJava FAILED

FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':modules:lib-kafka-plugins:compileTestJava'.
> Could not resolve all files for configuration ':modules:lib-kafka-plugins:testCompileClasspath'.
   > Could not resolve com.github.castorm:kafka-connect-http:0.8.9.
     Required by:
         project :modules:lib-kafka-plugins
      > Could not resolve com.github.castorm:kafka-connect-http:0.8.9.
         > Could not parse POM https://repo.maven.apache.org/maven2/com/github/castorm/kafka-connect-http/0.8.9/kafka-connect-http-0.8.9.pom
            > Could not find com.github.castorm:kafka-connect-http-parent:0.8.9.
              Searched in the following locations:
                - https://repo.maven.apache.org/maven2/com/github/castorm/kafka-connect-http-parent/0.8.9/kafka-connect-http-parent-0.8.9.pom
                - https://pkgs.dev.azure.com/mjensen65816/dsds/_packaging/dsds/maven/v1/com/github/castorm/kafka-connect-http-parent/0.8.9/kafka-connect-http-parent-0.8.9.pom

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. Run with --scan to get full insights.

* Get more help at https://help.gradle.org

BUILD FAILED in 2s
4 actionable tasks: 3 executed, 1 up-to-date

Add Bearer Token support for HTTP authentication

Is your feature request related to a problem? Please describe.
Now it only support Basic authentication, does not support Bearer Token.

Describe the solution you'd like
Add the BearerHttpAuthenticator and its config

Describe alternatives you've considered

Additional context

Header values cannot contain commas

It would appear to be impossible to use http.request.headers to build a header which contains a comma in its value, since TemplateHttpRequestFactory uses ConfigUtils.breakDownHeaders() to parse comma-separated headers without any provision for allowing commas to be escaped. It ultimately uses String.split(",") to do this.

This presents a problem, for example, if one wants to include an If-Modified-Since header in the request. The recommended date format for this header (see RFC2613, 3.3.1 Full Date) somewhat stupidly includes a comma between the abbreviated day-of-week name and date/time parts. It's kind of silly that the day-of-week is in there, but it is.

If-Modified-Since is a very convenient header to include in order to minimize network traffic. It would be nice if it were supported in its "preferred" form.

It seems that the only change necessary would be to change the itemSplitter regex in ConfigUtils.breakDownHeaders() to only split on commas that do not follow some escape character. I am not familiar enough with the code base and typical usage patterns, however, to know whether this would present risk elsewhere.

Enable HttpSourceConnector partitioning

Use case

As a user
I want to perform several requests with a different set of parameters (partitions) against the same API from the same connector
So that the number of running connectors doesn't grow linearly respect to the number of partitions

Using a source topic to create the body request and receive the response

Hello, I am looking to:

  1. Use my "source" topic to feed a json payload into the body of the HTTP request.

  2. Receive and store the response in an output topic. (input = request, output = response)

I believe the confluent http connector does this, but it is a paid plugin unfortunately.

I dont see a way on the document on how to use a topic.value as a body, any suggestions here??

Thank you,

Matt G.

Later release than 0.3.5 does not contains a connector

Hello,

First of all thanks for the connector and the work on it, I noticed while testing it. That the latest releases (after 0.3.5) of the connector does not contains the jar but only the source code.

According to the README, this is where the latest version should be retrieved. Did you change the site where you upload the connector?
If yes where can we find it? If not, what is it needed to automate the release so it contains the jar?

How to transform with REST API

Hello,
I've been struggling with applying the ExtractField transform into a simple REST API response.
This is the REST API response:

    "d": {
        "results": [
            {
                "__metadata": {
                    "uri": "https://some_uri.com",
                    "type": "c4codata.ServiceRequest",
                    "etag": "W/\"datetimeoffset'2021-10-26T10%3A21%3A19.3991180Z'\""
                },
                "ObjectID": "00163E0A47311ED89AC41CD2E1EF8932",
                "ProductRecipientPartyID": "000000000000000000000000000000000000000000000000000013597241",

With the configuration file containing this property:
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
When I try to extract a field, I get an error: Unknown field: fieldname or Expected Struct, found String.
How should I work with transforms on fields, when the REST API returns such a JSON?
Is there a way to get into deeper fields?
I've looked at issue #19 , however this solution didn't work for me either.
What I want to extract is the whole value list, without the __metadata field, and to flatten some specific fields.
Expected result:

{ "ChangedByCustomerIndicator" : false,
  "ResponseByProcessorDateTimeContent" : "", ... }

The 'base' output, without any transforms, value converters or response mappers:

{
  "value": "{\"__metadata\":{\"uri\":\"https://some_uri.com\",\"type\":\"c4codata.ServiceRequest\",\"etag\":\"W/\\\"datetimeoffset'2021-10-19T10%3A11%3A42.3716190Z'\\\"\"},\"ObjectID\":\"00163EADAC081EEC8C988DA861144CDB\",\"ProductRecipientPartyID\":\"000000000000000000000000000000000000000000000000000014555600\", .... }}",
  "key": {
    "string": "00163EADAC081EEC8C988DA861144CDB"
  },
  "timestamp": {
    "long": 1634638091225
  }
}

The result I have managed to achieve by adding:

"http.response.record.mapper": "com.github.castorm.kafka.connect.http.record.StringKvSourceRecordMapper",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
{
  "__metadata": {
    "uri": "https://some_uri.com",
    "type": "c4codata.ServiceRequest",
    "etag": "W/\"datetimeoffset'2021-10-28T10%3A06%3A09.6248630Z'\""
  },
  "ObjectID": "00163E0A47311ED89AC41CD2E1EF8932",
  "ProductRecipientPartyID": "000000000000000000000000000000000000000000000000000010005577",
  "ProductRecipientPartyUUID": "888888-111111-000000-222222",
  "ProductRecipientPartyName": "DFJSOWJRFNLDARI02URJE",
  "IncidentServiceIssueCategoryID": "VAPROGIJAL",

Duplicated records with low `http.timer.interval.millis`

Describe the bug
When the http.timer.interval.millis value is lower than the offset.flush.interval.ms kafka connect worker setting, the records in topic are duplicated every http.timer.interval.millis until offset is commited.

To Reproduce
Set http.timer.interval.millis connector config to 1000 ms.
Use default http.timer.interval.millis worker setting of 60000 ms.

Expected behavior
Records should appear once in the target topic.
For the poll, the offset from the last committed record should be used.
Updating the offset value should be done at the start of each poll instead of during commit.

Kafka Connect:

  • Version 3.4.0

Plugin:

  • Version 0.8.11

Additional context
Add any other context about the problem here.

version values are not loaded

Describe the bug
Error loading version.properties (com.github.castorm.kafka.connect.common.VersionUtils)

To Reproduce
Download the zip and copy to a folder use the folder path as plugins to Kafka connector. Start Connector after giving the necessary properties in the config files.

Expected behavior
Connection should be established to the Kafka topic mentioned in the properties file
castorm_version_error

URL encoding of query parameters

Is your feature request related to a problem? Please describe.
I need to implement CDC, but the data field I need to pass into the query params is not URL encoded, and so I get a 400 error from the server.

Describe the solution you'd like
URL encode the query parameters within the connector.

Describe alternatives you've considered
None I could think of.

java.lang.NoClassDefFoundError: freemarker/template/Configuration

I am getting following error while starting my source connector:

[2022-03-14 09:48:10,669] INFO [http-source|task-0] TemplateHttpRequestFactoryConfig values:
http.request.body =
http.request.headers = Accept:application/json
http.request.method = GET
http.request.params =
http.request.template.factory = class com.github.castorm.kafka.connect.http.request.template.freemarker.BackwardsCompatibleFreeMarkerTemplateFactory
http.request.url = http://localhost:8080/api/messages
(com.github.castorm.kafka.connect.http.request.template.TemplateHttpRequestFactoryConfig:376)
[2022-03-14 09:48:10,671] ERROR [http-source|task-0] WorkerSourceTask{id=http-source-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:195)
java.lang.NoClassDefFoundError: freemarker/template/Configuration
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
at java.lang.Class.getConstructor0(Class.java:3075)
at java.lang.Class.getDeclaredConstructor(Class.java:2178)
at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:390)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:399)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:434)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:419)
at com.github.castorm.kafka.connect.http.request.template.TemplateHttpRequestFactoryConfig.(TemplateHttpRequestFactoryConfig.java:66)
at com.github.castorm.kafka.connect.http.request.template.TemplateHttpRequestFactory.configure(TemplateHttpRequestFactory.java:48)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:405)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:434)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:419)
at com.github.castorm.kafka.connect.http.HttpSourceConnectorConfig.(HttpSourceConnectorConfig.java:71)
at com.github.castorm.kafka.connect.http.HttpSourceTask.start(HttpSourceTask.java:86)
at org.apache.kafka.connect.runtime.WorkerSourceTask.initializeAndStart(WorkerSourceTask.java:225)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: freemarker.template.Configuration
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:103)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 23 more
[2022-03-14 09:48:10,675] INFO [http-source|task-0] [Producer clientId=connector-producer-http-source-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1208)

My Source connector properties are:

name=http-source
kafka.topic=http-messages
connector.class=com.github.castorm.kafka.connect.http.HttpSourceConnector
http.request.url=http://localhost:8080/api/messages
http.request.headers=Accept:application/json
http.timer.interval.millis=30000

I am using following version:

castorm-kafka-connect-http-0.8.11

I can also see freemarker-2.3.31.jar in my libraries with others:
image

Environment Details:
OS - Windows10
Java 1.8
Confluent version - 7.0.1 (standalone)
Http connector version - v0.8.11

Support for paginated response from API

Is your feature request related to a problem? Please describe.
Recently I have been working on a project that requires the HTTP source connector. But the API returns only a set number of response as an array and provide a URL at the end to access the next set of responses. So if we can have a way to support calling the next page URL till we don't have any new record, it will solve the problem

Describe the solution you'd like
I would like to work on the problem, by adding a way to extract the next page URL till it exists or till no new records are present and keep pushing the records per page to Kafka.

Describe alternatives you've considered
I have also tried to collect all the messages and push them at once till no new page URL is present, but that will lead to a lot of memory issues if the number of pages is very large.

Confluent Hub Update

Describe the bug
I notice that the version published on Confluent Hub is 0.6.1 and not the latest version

Could you publish the latest version of the plugin, using Confluence Hub to install connectors enables a simpler workflow for Docker Image generation.

Great connector BTW.

Maven javadoc build failes when using JDK 11

Describe the bug
Uninstall JDK 1.8. Install JDK 11.0.7. Perform a "maven package" or "maven install". Javadoc build fails.

Found fix
Add:
<configuration> <source>8</source> </configuration>

To the <artifactId>maven-javadoc-plugin</artifactId> section of the pom.xml file.

Project now builds successfully using JDK 11.

Plugin:
Version 0.7.4

Setting up the http.request.body field

Hello!

First, thank you for the library, that's really great to have a free version of HTTP connector (compared to the one from Confluent which is paid). My first question is, how can I put the message's payload into the body of the request. I'm expecting to have something like:

"http.request.body": "${message.payload}"

or similar (assuming that the messages from a topic are in JSON). But this doesn't work.

The more general question is, how can I know all the possible fields that I reference in the template? I know you use Freemaker as a template engine, but what is the context for it?

Thank you,
Ivan

Make timestamp parser able to use optional property?

In our datasource, we have two timestamps, "createdAt" and "updatedAt". Problem is that "updatedAt" is not available on new objects. They only get that property once they have been save at least once after their creation.

So, using the following offset pointer will break on new objects, but work fine otherwise.
"http.response.record.offset.pointer": "key=/id, timestamp=/updatedAt"

Would it be possible override some of the classes to come around this?

Is pre-authentication against a second url supported?

Hi,
As per our architecture each of our API needs an access token to access the API data. This means that I have to first hit the security API which gives me access token and then the main API which gives me data back. Does this connector support this?

For eg:

  1. Hit https:///security-api/api (This gives me back access token)
  2. Hit the Resource API https:///customers (Have to pass access token from the first API into header of this API)

Repeated API calls upon data stream update exceeding API quota

Describe the bug
Am now getting locked out of the NY Times API with a 429 response code and "Rate limit quota violation. Quota limit exceeded" error.

To Reproduce
Use the following source connector config
nytimes_connector_quota_problem_config.txt

Expected behavior
To not exceed the API's rate limit.

Kafka Connect:

  • Version 5.5.0

Plugin:

  • Version 0.7.5

Additional context
The throttle limit for the NY Times is up to 10 times per minute or 4000 times per day. I created a new app to get a new API key and waited a full day to ensure I hadn't hit the daily limit for some reason. I also turned up the throttling to poll only once every 5 min. I think that when NYTimes updates it's data stream, for some reason the connector is hitting the API endpoint more than 10 times in a minute.

Node not found - parse error - flat arry

The data payload coming from the HTTP Endpoint does not have a "root" level key. The JSON array is flat.

[{key:value, key:value, key:value} , {key:value, key:value, key:value} ]

Error thrown is :

[2024-02-02 13:29:29,257] ERROR WorkerSourceTask{id=netreo-beta-test-2.http.source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:191)
java.lang.IllegalArgumentException: No node at '/create_time' (unmatched part: '/create_time')

Here is my config:

{
"name": "netreo-beta-test-2.http.source",
"config": {
"connector.class": "com.github.castorm.kafka.connect.http.HttpSourceConnector",
"tasks.max": "1",
"http.offset.initial": "timestamp=2020-05-08T07:55:44Z",
"http.request.params": "jql=create_time>=${offset.timestamp?datetime.iso?long}",
"http.request.url": "https://qa-system/fw/index.php?r=restful/devices/list",
"http.request.headers": "Accept: application/json",
"http.auth.type": "Basic",
"http.auth.password": "PASSWORD",
"http.response.record.offset.pointer": "key=/, timestamp=/create_time",
"http.timer.interval.millis": "30000",
"http.timer.catchup.interval.millis": "1000",
"kafka.topic": "beta-test-2"
}
}
~

How to map the individual JSON fields to topic fields?

I may be missing something in the documentation, and if so, my apologies, but when I get the data, and put it into the topic, has all of the JSON in a VALUE field. I would like to map the individual JSON fields to their own topic message fields. Is that possible with this connector?

Each JSON response is packed into "payload.value" as an invalid JSON string

Describe the bug
The JSON documents returned by the REST API call are recorded to the Kafka topic in a single field of "payload.value" with extra backslashes and double quote characters. This behavior renders the string invalid JSON when passed to Elasticsearch and proper document field:value pairs are not created.

To Reproduce
I didn't track down the same exact record through the process. Examples are attached.

Use this source config:
nytimes_source_connector_config.txt

Getting this JSON response:
nytimes_response.txt

An example entry in the Kafka topic is created:
kafka_topic_entry.txt

Use this sink config:
nytime_sink_connector_config.txt

An example Elasticsearch document is created that is unparsed:
elasticsearch_document.txt

Expected behavior
The JSON responses are stored in the Kafka topic as valid JSON without the extra double quotes and backslashes that turn the response JSON into a very long string.

Kafka Connect:
Using the confluent-platform-2.12. All components are at version 5.5.0

Plugin:
Self complied 0.7.1 using Java JDK-1.8.0_251

Additional context
Perhaps I am simply misunderstanding how to properly configure the connector. I've tried many variations of configuration options and SMT's to try and work around this behavior.

DepShield encountered errors while building your project

The project could not be analyzed because of build errors. Please review the error messages here. Another build will be scheduled when a change to a manifest file* occurs. If the build is successful this issue will be closed, otherwise the error message will be updated.

This is an automated GitHub Issue created by Sonatype DepShield. GitHub Apps, including DepShield, can be managed from the Developer settings of the repository administrators.

* Supported manifest files are: pom.xml, package.json, package-lock.json, npm-shrinkwrap.json, Cargo.lock, Cargo.toml, main.rs, lib.rs, build.gradle, build.gradle.kts, settings.gradle, settings.gradle.kts, gradle.properties, gradle-wrapper.properties, go.mod, go.sum

Maven package build fails on JDK 11.0.11

Describe the bug
It's the same bug as #22, only a newer version.

See that issue for further details.

Additional context
JDK version 11.0.11 (current Ubuntu 20.04LTS default)

Quickfix:
Changing ${java.version} to 11 resolves the issue. Perhaps it would be possible to just take the major version of java instead of the entire version with bugfix?

Debugging in Intellij

A quick question . How do I debug this in Intellij without exporting as jar everytime to libs folder of Kafka. Iam using apache kafka. I want to check the code flow in Intellij. Is there anyway to do this.

Handling multi-line JSON response

Is there a way to handle a response that contains multiple JSON objects, one per line? This would not be a valid JSON document, and is not an array.
Example:

{"foo": "bar", "id": 1}
{"foo": "bar", "id": 2}
{"foo": "bar", "id": 3}

It appears the default behavior is that only the first JSON object is parsed into a record.

Thanks for any ideas.

AWS Signv4 authentication

Describe the solution you'd like
I would like to connect to elastic search, but the authentication should be AWS SignV4

Describe alternatives you've considered
I have a custom python script to fetch data from elastic search.

Offset isn't committed directly

Describe the bug
For some reason, offsets aren't committed immediately, which lets the connector use the initial timestamp forever. Upon further logs, I found that the method commit (which is overridden) is never called.

My logs show that the timestamp is successfully extracted from the data, but it is never replaced, so the initial timestamp is always used.

[2021-05-19 13:16:39,116] INFO Record timestamp: 2021-05-19T00:00:00Z (com.github.castorm.kafka.connect.http.HttpSourceTask)
[2021-05-19 13:16:39,116] INFO Request for offset {timestamp=2020-01-01T00:00:01Z} yields 1/1 new records (com.github.castorm.kafka.connect.http.HttpSourceTask)
[2021-05-19 13:16:39,119] INFO Commit record timestamp: 2021-05-19T00:00:00Z (com.github.castorm.kafka.connect.http.HttpSourceTask)
...
[2021-05-19 13:16:39,116] INFO Request for offset {timestamp=2020-01-01T00:00:01Z} yields 1/1 new records (com.github.castorm.kafka.connect.http.HttpSourceTask)

Expected behavior
Offset committed after each record.

HTTP Streams

Is your feature request related to a problem? Please describe.
I haven't seen an option on this connector for infinitely streaming data. An example could be found here on the Twitter API, that sends a gzipped stream of tweets: https://developer.twitter.com/en/docs/twitter-api/enterprise/powertrack-api/api-reference/powertrack-stream#Stream

Describe the solution you'd like
Perhaps a boolean option upon creation of the connector to say that were connecting to an infinitie stream of data. Additionally maybe another option to support converting data formats such as gzip.

Describe alternatives you've considered
This appears to be an unmaintained version, only supporting an older Twitter streaming API: https://github.com/jcustenborder/kafka-connect-twitter

Additional context
I can see a lot of people using this connector if it's able to support HTTP streaming, such as the Twitter APIs

Does the connector support HTTPS connection

Hey @castorm,

I was looking in the docs and I see entry regarding HTTPS support. I was wondering does the connector support TLS/SSL request?
If you never tested, I can do it on my side and update the documenation accordingly.

The reason of the question, we want to poll ES with your connector and we are wondering if this would be feasible with TLS/SSL support without updating the HTTP client.

Regards,
Gil

JIRA example with propertie file

Describe the bug
I tried to use the JIRA example with a standalone connect worker and a locally hosted JIRA. During the start-up, the templating seems to fails (the following has evaluated to null or missing: ==> timestamp)

[2020-07-07 17:36:46,617] INFO WorkerSourceTask{id=sample-search-issues.jira.http.source-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:214)
[2020-07-07 17:36:47,256] ERROR Error executing FreeMarker template (freemarker.runtime:60)
FreeMarker template error:
The following has evaluated to null or missing:
==> timestamp  [in template "a5a5f524-92d3-4366-8836-200eeaa80848" at line 1, column 17]

----
Tip: If the failing expression is known to legally refer to something that's sometimes null or missing, either specify a default value like myOptionalVar!myDefault, or use <#if myOptionalVar??>when-present<#else>when-missing</#if>. (These only cover the last step of the expression; to cover the whole expression, use parenthesis: (myOptionalVar.foo)!myDefault, (myOptionalVar.foo)??
----

----
FTL stack trace ("~" means nesting-related):
        - Failed at: ${timestamp?datetime.iso?string["yyyy...  [in template "a5a5f524-92d3-4366-8836-200eeaa80848" at line 1, column 15]
----

Java stack trace (for programmers):
----
freemarker.core.InvalidReferenceException: [... Exception message was already printed; see it above ...]
        at freemarker.core.InvalidReferenceException.getInstance(InvalidReferenceException.java:134)
        at freemarker.core.EvalUtil.coerceModelToTextualCommon(EvalUtil.java:481)
        at freemarker.core.EvalUtil.coerceModelToPlainText(EvalUtil.java:455)
        at freemarker.core.Expression.evalAndCoerceToPlainText(Expression.java:117)
        at freemarker.core.BuiltInsForMultipleTypes$dateBI._eval(BuiltInsForMultipleTypes.java:253)
        at freemarker.core.Expression.eval(Expression.java:101)
        at freemarker.core.Dot._eval(Dot.java:41)
        at freemarker.core.Expression.eval(Expression.java:101)
        at freemarker.core.BuiltInsForMultipleTypes$stringBI._eval(BuiltInsForMultipleTypes.java:765)
        at freemarker.core.Expression.eval(Expression.java:101)
        at freemarker.core.DynamicKeyName._eval(DynamicKeyName.java:61)
        at freemarker.core.Expression.eval(Expression.java:101)
        at freemarker.core.DollarVariable.calculateInterpolatedStringOrMarkup(DollarVariable.java:100)
        at freemarker.core.DollarVariable.accept(DollarVariable.java:63)
        at freemarker.core.Environment.visit(Environment.java:334)
        at freemarker.core.Environment.visit(Environment.java:340)
        at freemarker.core.Environment.process(Environment.java:313)
        at freemarker.template.Template.process(Template.java:383)
        at com.github.castorm.kafka.connect.http.request.template.freemarker.FreeMarkerTemplateFactory.apply(FreeMarkerTemplateFactory.java:55)
        at com.github.castorm.kafka.connect.http.request.template.freemarker.FreeMarkerTemplateFactory.lambda$create$0(FreeMarkerTemplateFactory.java:44)
        at com.github.castorm.kafka.connect.http.request.template.TemplateHttpRequestFactory.createRequest(TemplateHttpRequestFactory.java:64)
        at com.github.castorm.kafka.connect.http.HttpSourceTask.poll(HttpSourceTask.java:97)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:270)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:237)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
[2020-07-07 17:36:47,263] INFO WorkerSourceTask{id=sample-search-issues.jira.http.source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:424)
[2020-07-07 17:36:47,263] INFO WorkerSourceTask{id=sample-search-issues.jira.http.source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:441)
[2020-07-07 17:36:47,263] ERROR WorkerSourceTask{id=sample-search-issues.jira.http.source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:186)
FreeMarker template error:
The following has evaluated to null or missing:
==> timestamp  [in template "a5a5f524-92d3-4366-8836-200eeaa80848" at line 1, column 17]

To Reproduce
I used the provided JIRA example (JSON) to derive the following property file:

name=sample-search-issues.jira.http.source
connector.class=com.github.castorm.kafka.connect.http.HttpSourceConnector
tasks.max=1
http.offset.initial="timestamp=2020-05-08T07:55:44Z"
http.request.url=http://localhost:8080/rest/api/2/search
http.request.headers="Authorization: Basic xyz, Accept: application/json, Accept-Encoding: *"
http.request.params="jql=updated>=${timestamp?datetime.iso?string['yyyy/MM/dd HH:mm']} ORDER BY updated ASC&maxResults=100"
http.response.list.pointer=/issues
http.response.record.key.pointer=/id
http.response.record.timestamp.pointer=/fields/updated
http.throttler.interval.millis=30000
http.throttler.catchup.interval.millis=1000
kafka.topic=jira

Expected behavior
"CDC-like" polling of JIRA API

Kafka Connect:

  • Version 2.5

Plugin

  • castorm-kafka-connect-http-0.7.6

[DepShield] (CVSS 7.5) Vulnerability due to usage of org.apache.commons:commons-compress:1.20

Vulnerabilities

DepShield reports that this application's usage of org.apache.commons:commons-compress:1.20 results in the following vulnerability(s):


Occurrences

org.apache.commons:commons-compress:1.20 is a transitive dependency introduced by the following direct dependency(s):

org.testcontainers:testcontainers:1.16.0
        └─ org.apache.commons:commons-compress:1.20

com.github.castorm:kafka-connect-http-infra:0.8.12-SNAPSHOT
        └─ org.testcontainers:testcontainers:1.16.0
              └─ org.apache.commons:commons-compress:1.20

This is an automated GitHub Issue created by Sonatype DepShield. Details on managing GitHub Apps, including DepShield, are available for personal and organization accounts. Please submit questions or feedback about DepShield to the Sonatype DepShield Community.

Authorization header

When setting an Authorization header:

http.request.headers=Authorization=Bearer xxxxxxxx

i got this error:

java.lang.IllegalStateException: Incomplete pair: Authorization=Bearer xxxxxxxx

I tried also enclosing the value in " or '.
The http service i'm scraping allows to pass the authorization token also as http argument, and it's working this way.
Thanks!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.