confluentinc / examples Goto Github PK
View Code? Open in Web Editor NEWApache Kafka and Confluent Platform examples and demos
License: Apache License 2.0
Apache Kafka and Confluent Platform examples and demos
License: Apache License 2.0
I want to try producing messages to a confluent-kafka topic from librdkafka,
I have cloned the git repository
I am getting the error while runrning "make"
~/examples/clients/cloud/c$ make
cc consumer.c common.c json.c -o consumer -lrdkafka -lm
common.c: In function ‘create_topic’:
common.c:134:9: error: unknown type name ‘rd_kafka_NewTopic_t’; did you mean ‘rd_kafka_topic_t’?
rd_kafka_NewTopic_t *newt;
^~~~~~~~~~~~~~~~~~~
rd_kafka_topic_t
common.c:138:15: error: unknown type name ‘rd_kafka_CreateTopics_result_t’
const rd_kafka_CreateTopics_result_t *res;
^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
common.c:139:15: error: unknown type name ‘rd_kafka_topic_result_t’
const rd_kafka_topic_result_t **restopics;
^~~~~~~~~~~~~~~~~~~~~~~
common.c:145:16: warning: implicit declaration of function ‘rd_kafka_NewTopic_new’; did you mean ‘rd_kafka_topic_new’? [-Wimplicit-function-declaration]
newt = rd_kafka_NewTopic_new(topic, num_partitions, replication_factor,
^~~~~~~~~~~~~~~~~~~~~
rd_kafka_topic_new
common.c:145:14: warning: assignment makes pointer from integer without a cast [-Wint-conversion]
newt = rd_kafka_NewTopic_new(topic, num_partitions, replication_factor,
^
common.c:157:9: warning: implicit declaration of function ‘rd_kafka_CreateTopics’; did you mean ‘rd_kafka_queue_poll’? [-Wimplicit-function-declaration]
rd_kafka_CreateTopics(rk, &newt, 1, NULL, queue);
^~~~~~~~~~~~~~~~~~~~~
rd_kafka_queue_poll
common.c:159:9: warning: implicit declaration of function ‘rd_kafka_NewTopic_destroy’; did you mean ‘rd_kafka_topic_destroy’? [-Wimplicit-function-declaration]
rd_kafka_NewTopic_destroy(newt);
^~~~~~~~~~~~~~~~~~~~~~~~~
rd_kafka_topic_destroy
common.c:181:15: warning: implicit declaration of function ‘rd_kafka_event_CreateTopics_result’; did you mean ‘rd_kafka_event_topic_partition’? [-Wimplicit-function-declaration]
res = rd_kafka_event_CreateTopics_result(rkev);
^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
rd_kafka_event_topic_partition
common.c:181:13: warning: assignment makes pointer from integer without a cast [-Wint-conversion]
res = rd_kafka_event_CreateTopics_result(rkev);
^
common.c:186:21: warning: implicit declaration of function ‘rd_kafka_CreateTopics_result_topics’; did you mean ‘rd_kafka_event_topic_partition’? [-Wimplicit-function-declaration]
restopics = rd_kafka_CreateTopics_result_topics(res, &restopic_cnt);
^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
rd_kafka_event_topic_partition
common.c:186:19: warning: assignment makes pointer from integer without a cast [-Wint-conversion]
restopics = rd_kafka_CreateTopics_result_topics(res, &restopic_cnt);
^
common.c:189:13: warning: implicit declaration of function ‘rd_kafka_topic_result_error’; did you mean ‘rd_kafka_topic_destroy’? [-Wimplicit-function-declaration]
if (rd_kafka_topic_result_error(restopics[0]) ==
^~~~~~~~~~~~~~~~~~~~~~~~~~~
rd_kafka_topic_destroy
common.c:192:25: warning: implicit declaration of function ‘rd_kafka_topic_result_name’; did you mean ‘rd_kafka_topic_name’? [-Wimplicit-function-declaration]
rd_kafka_topic_result_name(restopics[0]));
^~~~~~~~~~~~~~~~~~~~~~~~~~
rd_kafka_topic_name
common.c:191:41: warning: format ‘%s’ expects argument of type ‘char *’, but argument 3 has type ‘int’ [-Wformat=]
fprintf(stderr, "Topic %s already exists\n",
~^
%d
rd_kafka_topic_result_name(restopics[0]));
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
common.c:196:25: warning: implicit declaration of function ‘rd_kafka_topic_result_error_string’; did you mean ‘rd_kafka_event_error_string’? [-Wimplicit-function-declaration]
rd_kafka_topic_result_error_string(restopics[0]));
^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
rd_kafka_event_error_string
common.c:194:58: warning: format ‘%s’ expects argument of type ‘char *’, but argument 3 has type ‘int’ [-Wformat=]
fprintf(stderr, "Failed to create topic %s: %s\n",
~^
%d
rd_kafka_topic_result_name(restopics[0]),
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
common.c:194:62: warning: format ‘%s’ expects argument of type ‘char *’, but argument 4 has type ‘int’ [-Wformat=]
fprintf(stderr, "Failed to create topic %s: %s\n",
~^
%d
common.c:196:25:
rd_kafka_topic_result_error_string(restopics[0]));
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
common.c:199:41: warning: format ‘%s’ expects argument of type ‘char *’, but argument 3 has type ‘int’ [-Wformat=]
fprintf(stderr, "Topic %s successfully created\n",
~^
%d
rd_kafka_topic_result_name(restopics[0]));
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Makefile:10: recipe for target 'consumer' failed
make: *** [consumer] Error 1
check_env should validate curl is installed. A minimal Ubuntu 18.04.2 LTS installation does not install curl by default.
$ check_running_cp 5.2
This script expects Confluent Platform version 5.2 but the running version is .
To proceed please either: change the examples repo branch to or update the running Confluent Platform to version 5.2.
If you look at the output of confluent
you can see the error ...
$ confluent version
'confluent' requires 'curl'.
Hi
I am getting following message when I ran examples-5.3.0-post/security/rbac/scripts/enable-rbac-broker.sh.
When I logon to MDS from the console I am getting following message: http://localhost:8090
{"status_code":404,"message":"HTTP 404 Not Found","type":"CLIENT_ERROR"}. as user user mds/mds1.
Waited till all the processing complete and restarted afterwards but I am still seeing the same error messages. Had my CONFLUENT_HOME and PATH set properly, expect binary exist in the path as well.
Is there any thing that I am doing wrong that triggers this behaviour. This is on macOS
https://github.com/confluentinc/examples/blob/5.2.1-post/ccloud/acl.sh .
Line 164 API_SECRET_SA=$(echo "$OUTPUT" | grep "| Secret" | awk '{print $4;}')
The Double quotes around $OUTPUT cause a bogus string to be put in the client.config
I changed the code to
API_SECRET_SA=$(echo "$OUTPUT" | grep '| Secret' | awk '{print $4;}')
Same as the API_KEY and now works. client.config is built with the correct API_SECRET_SA value
I have seen clickstream example and noticed that field EVENT_TS
in the table CLICK_USER_SESSIONS
related to creation of the table time but not starting of windows.
I created the CLICK_USER_SESSIONS table at 10:57
CREATE TABLE CLICK_USER_SESSIONS AS
SELECT username, WindowStart() AS EVENT_TS,
count(*) AS events
FROM USER_CLICKSTREAM window SESSION (300 second) GROUP BY username;
Now at 11:20 I check records in this table:
ksql> print CLICK_USER_SESSIONS limit 5;
Format:JSON
{"ROWTIME":1584087650626,"ROWKEY":"DimitriSchenz88\u0000\u0000\u0001p??QB\u0000\u0000\u0001p??-U","USERNAME":"DimitriSchenz88","EVENT_TS":1584086265173,"EVENTS":1358}
{"ROWTIME":1584087650991,"ROWKEY":"Roberto_123\u0000\u0000\u0001p??R?\u0000\u0000\u0001p??**","USERNAME":"Roberto_123","EVENT_TS":1584086264362,"EVENTS":3442}
{"ROWTIME":1584087651103,"ROWKEY":"BenSteins_235\u0000\u0000\u0001p??S\u001F\u0000\u0000\u0001p??/?","USERNAME":"BenSteins_235","EVENT_TS":1584086265760,"EVENTS":1373}
{"ROWTIME":1584087651700,"ROWKEY":"Oriana_70\u0000\u0000\u0001p??Ut\u0000\u0000\u0001p??,\u0007","USERNAME":"Oriana_70","EVENT_TS":1584086264839,"EVENTS":3297}
{"ROWTIME":1584087651901,"ROWKEY":"AndySims_345324\u0000\u0000\u0001p??V=\u0000\u0000\u0001p??*?","USERNAME":"AndySims_345324","EVENT_TS":1584086264535,"EVENTS":1379}
Timestamp in these records points to Fri Mar 13 2020 10:57:44
What's going wrong? Why the records in the CLICK_USER_SESSIONS
is not related to starting of the window?
11:08 $ cat docker-compose.yml | grep image
image: confluentinc/cp-zookeeper:5.4.1
image: confluentinc/cp-server:5.4.1
image: confluentinc/cp-schema-registry:5.4.1
image: confluentinc/cp-server-connect:5.4.1
image: confluentinc/cp-ksql-server:5.4.1
image: docker.elastic.co/elasticsearch/elasticsearch:6.3.0
image: grafana/grafana:5.2.4
image: docker.elastic.co/kibana/kibana:6.3.0
The existing delta script uses the stand alone Replicator, but a general Connect cluster is much more useful :)
Tried this on master branch as well as 5.4.x. Not able to get the stack up.
sudo docker-compose up -d
Pulling zookeeper-west (confluentinc/cp-zookeeper:5.4.0-SNAPSHOT)...
ERROR: manifest for confluentinc/cp-zookeeper:5.4.0-SNAPSHOT not found
https://github.com/confluentinc/examples/tree/5.3.1-post/microservices-orders
I am new to Kafka and confused with so many things.
I am using rust
programming language to write microservices and want to use Kafka for messaging and so on.
I'm not sure if its as good idea to use API
based microservices in rust using rocket
and across the microservice communicating using Kafka messaging and stream API. Also, I'm not sure how efficiently I can fetch records based on multiple joins etc to response back to the client.
What is the better approach?
/Shiv
I was trying out "multi-datacenter" demo, and looks like the yaml and scripts does not support macOS.
I was using docker-machine
and the docker and containers all runs in a VirtualBox machine. In the docker-compose.yml
Kafka advertised themselves as localhost:<port>
, this would not work with VirtualBox as the VM has its own IP.
C3 didn't have any clusters post deployment[RBAC Docker].
Tried logging on to c3 using confluent cli as professor and granting access to a user.
Not able to see any targets in the CC. Any thing I need to do to get this working.
RBAC console producer/consumer not working
kafka-console-consumer --bootstrap-server localhost:9092 --topic topic1 --from-beginning --consumer.config ../delta_configs/client.properties.delta
[2019-08-17 02:17:08,066] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:412)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:502)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:209)
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:185)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:549)
at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:249)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:326)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:439)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:105)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:77)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Processed a total of 0 messages
Docker container 'connect' is starting but dying shortly afterwards.
Environment:
MacBook Pro
MacOs Version: 10.14.6
ocker version
Client: Docker Engine - Community
Version: 19.03.5
API version: 1.40
Go version: go1.12.12
Git commit: 633a0ea
Built: Wed Nov 13 07:22:34 2019
OS/Arch: darwin/amd64
Experimental: false
Server: Docker Engine - Community
Engine:
Version: 19.03.5
API version: 1.40 (minimum version 1.12)
Go version: go1.12.12
Git commit: 633a0ea
Built: Wed Nov 13 07:29:19 2019
OS/Arch: linux/amd64
Experimental: false
containerd:
Version: v1.2.10
GitCommit: b34a5c8af56e510852c35414db4c1f4fa6172339
runc:
Version: 1.0.0-rc8+dev
GitCommit: 3e425f80a8c931f88e6d94a8c831b9d5aa481657
docker-init:
Version: 0.18.0
GitCommit: fec3683
13:59 $ docker-compuse up -d
...
docker ps shows connect not running
Then tried starting connect individually in foreground.
14:02 $ docker-compose up connect
Last few lines of output:
connect | [2019-12-20 20:02:47,714] INFO Added plugin 'org.apache.kafka.connect.transforms.Flatten$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect | [2019-12-20 20:02:47,714] INFO Added plugin 'org.apache.kafka.connect.transforms.ReplaceField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect | [2019-12-20 20:02:47,714] INFO Added plugin 'org.apache.kafka.connect.transforms.ReplaceField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect | [2019-12-20 20:02:47,714] INFO Added plugin 'org.apache.kafka.connect.transforms.HoistField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect | [2019-12-20 20:02:47,715] INFO Added plugin 'org.apache.kafka.connect.transforms.SetSchemaMetadata$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect | [2019-12-20 20:02:47,715] INFO Loading plugin from: /usr/share/java/kafka-connect-storage-common (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect exited with code 137
I have decoupled the docker compose setup to run it on VM's on different regions. I am trying to create topics using this command.
kafka-topics --create \
> --bootstrap-server <my_broker>:9092 \
> --topic single-region \
> --partitions 1 \
> --replica-placement single.json \
> --config min.insync.replicas=1
Here is my json
{
"version": 1,
"replicas": [
{
"count": 2,
"constraints": {
"rack": "west"
}
}
]
}
Here is the error i am recieving
> --bootstrap-server <hostname>:9092 \
> --topic single-region \
> --partitions 1 \
> --replica-placement single.json \
> --config min.insync.replicas=1
Error while executing topic command : org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic config name: confluent.placement.constraints
[2019-12-17 12:02:34,784] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic config name: confluent.placement.constraints
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:283)
at kafka.admin.TopicCommand$TopicService.createTopic(TopicCommand.scala:235)
at kafka.admin.TopicCommand$TopicService.createTopic$(TopicCommand.scala:230)
at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:258)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:62)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic config name: confluent.placement.constraints
(kafka.admin.TopicCommand$)
Here is the log from kafka endpoint
[2019-12-17 12:09:27,788] INFO [Admin Manager on Broker 1]: Error processing create topic request CreatableTopic(name='single-region', numPartitions=1, replicationFactor=-1, assignments=[], configs=[CreateableTopicConfig(name='confluent.placement.constraints', value='{"version":1,"replicas":[{"count":1,"constraints":{"rack":"west"}}],"observers":[]}'), CreateableTopicConfig(name='min.insync.replicas', value='1')]) (kafka.server.AdminManager)
Kafka version : 5.4.0-ccs-beta1
I have used the following versions to install confluent-kafka
$ wget -qO - https://packages.confluent.io/deb/5.3/archive.key | sudo apt-key add -
$ sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/beta201908/deb/5.4 stable main"
$ sudo apt-get update && sudo apt-get install confluent-platform-2.12
Using this repo versions, ----replica-placement command is not available.
I have used this doc for reference
https://docs.confluent.io/current/release-notes/5-4-preview.html
98b14bbfd50d confluentinc/cp-kafka-connect:5.3.1 "/etc/confluent/dock…" 12 minutes ago Exited (1) 7 minutes ago docker-compose_kafka-connect-cp_1
Using examples/cp-all-in-one/docker-compose.yml on branch 5.3.2-post, schema-registry does not respond to GET requests on localhost:8081/subjects, you get a ERR_EMPTY_RESPONSE error in Chrome.
Using 'curl localhost:8081/subjects' you get:
curl: (52) Empty reply from server
If you 'docker exec -it containerid sh' into the running schema registry container, it responds to 'curl localhost:80801/subjects' as you expect with a 200 response.
docker-compose ps shows (all other services are up too) :
schema-registry /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp
I am creating a source connector and I need it to access the internet. How do I configure it on docker-compose?
I'm trying 'CDC with Postgres' example with docker but was getting an error.
The example page:
https://github.com/confluentinc/examples/blob/5.3.0-post/postgres-debezium-ksql-elasticsearch/postgres-debezium-ksql-elasticsearch-docker-short.adoc
$ cd docker-compose
$ docker-compose up -d
...
Pulling kafka-connect-cp (confluentinc/cp-kafka-connect:5.3.0)...
5.3.0: Pulling from confluentinc/cp-kafka-connect
e7a7e6031030: Already exists
9ce894b58e49: Already exists
0f0f22e55d98: Already exists
207846bb27c4: Already exists
be811e98eba6: Already exists
431754c05b84: Pull complete
7e828836ea40: Pull complete
6105fe867172: Pull complete
bdd0c2c6cfe2: Pull complete
Digest: sha256:5417fae094a535da4e51aa243c8003eb9d6dc85ab2adf75af079d5dd31706319
Status: Downloaded newer image for confluentinc/cp-kafka-connect:5.3.0
Pulling ksql-server (confluentinc/ksql-cli:5.3.0)...
ERROR: manifest for confluentinc/ksql-cli:5.3.0 not found: manifest unknown: manifest unknown
Any idea what could be the issue.
https://github.com/confluentinc/examples/blob/5.2.1-post/ccloud/acl.sh
Line 315
ccloud kafka acl create --allow --service-account-id $SERVICE_ACCOUNT_ID --operation READ --consumer-group java_example_group_1
Creates an ACL for java_example_group_1 but the java client hard codes group to demo-consumer-1
Caused by: org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: demo-consumer-1
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 7.750 s
[INFO] Finished at: 2019-06-13T18:27:42+00:00
[INFO] Final Memory: 26M/377M
https://github.com/confluentinc/examples/blob/5.2.1-post/ccloud/acl.sh
https://github.com/confluentinc/examples/blob/5.2.1-post/ccloud/acl.sh
https://github.com/confluentinc/examples/blob/5.2.1-post/ccloud/acl.sh
Line 315 ccloud kafka acl create --allow --service-account-id $SERVICE_ACCOUNT_ID --
https://github.com/confluentinc/examples/blob/5.2.1-post/clients/cloud/java/src/main/java/io/confluent/examples/clients/cloud/ConsumerExample.java
I changed the group.id to match the ACL
props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-consumer-1"); CHANGED to
but we never saw the expected output "Successfully joined group" so acl.sh threw an error
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.2.1:java (default-cli) on project clients-example: An exception occured while executing the Java class. null: InvocationTargetException: Not authorized to access group: demo-consumer-1 -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch
Currently i am using this command to create a topic in a specific region using the replica placement flag.
kafka-topics --create --bootstrap-server mybroker:9092 --topic single-region --partitions 1 --replica-placement topic-config/multi-region-async.json --config min.insync.replicas=1
Can i leverage the same in my producer api, which clients support this option.
I am getting following when I ran /examples/security/rbac/rbac-docker/confluent-start.sh file
Pulling zookeeper (368821881613.dkr.ecr.us-west-2.amazonaws.com/confluentinc/cp-zookeeper:5.3.x-latest)...
ERROR: Get https://368821881613.dkr.ecr.us-west-2.amazonaws.com/v2/confluentinc/cp-zookeeper/manifests/5.3.x-latest: no basic auth credentials
Problem:
When starting the Confluent Platform in Docker and working with connector related commands in KSQL-CLI, the following error occurs:
io.confluent.ksql.util.KsqlServerException: org.apache.http.conn.HttpHostConnectException: Connect to localhost:8083 [localhost/127.0.0.1] failed: Connection refused (Connection refused)
Caused by: org.apache.http.conn.HttpHostConnectException: Connect to
localhost:8083 [localhost/127.0.0.1] failed: Connection refused (Connection
refused)
Caused by: Could not connect to the server.
Caused by: Could not connect to the server.
Reproduce:
Start the confluent platform:
git clone https://github.com/confluentinc/examples
cd examples
git checkout 5.4.0-post
cd cp-all-in-one/
docker-compose up -d
Start KSQL-CLI:
docker run -it --rm --name ksql-cli-1 --network cp-all-in-one_default confluentinc/cp-ksql-cli:5.4.0 http://ksql-server:8088
Try to run list connectors & create a new connector:
list connectors;
CREATE SOURCE CONNECTOR example WITH(
"connector.class"='FileStreamSource',
"tasks.max"=1,
"file"='/etc/hostname',
"topic"='connect-test'
);
These commands fail with the error above
I get this error although I have cleaned up it
ERROR Error while creating ephemeral at /brokers/ids/2, node already exists and owner '144115218547277831' does not match current session '216172788743670313' (kafka.zk.KafkaZkClient$CheckedEphemeral)
[2019-12-18 02:45:10,121] ERROR [KafkaServer id=2] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
I got the following error when running start.sh within examples/wikepedia
../utils/helper.sh: line 68: jq: command not found
(23) Failed writing body
Hi,
I'm trying to execute example https://github.com/confluentinc/examples/blob/5.3.0-post/security/rbac/rbac-docker/README.md and I got this error:
$ ./confluent-start.sh
Generating public and private keys for token signing
Generating RSA private key, 2048 bit long modulus
............................................................+++
...............+++
e is 65537 (0x10001)
writing RSA key
Starting Zookeeper, OpenLDAP and Kafka with MDS
Pulling zookeeper (368821881613.dkr.ecr.us-west-2.amazonaws.com/confluentinc/cp-zookeeper:5.3.x-latest)...
ERROR: Get https://368821881613.dkr.ecr.us-west-2.amazonaws.com/v2/confluentinc/cp-zookeeper/manifests/5.3.x-latest: no basic auth credentials
Any ideas?
Thank you
I am trying to run the simple avro producer under
examples / clients / avro / src / main / java / io / confluent / examples / clients / basicavro / ProducerExample.java
I am using the gradle and I was not able to ad the avro gradle plugin in build file which can convert avsc file to java classes.
I have manually created the payment class and trying to run the code but I am getting below error.
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.IllegalArgumentException: Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord
at io.confluent.kafka.serializers.AbstractKafkaAvroSerDe.getSchema(AbstractKafkaAvroSerDe.java:129)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:76)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:854)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:816)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:703)
at Test.ProducerExample.main(ProducerExample.java:39)
..
Can you please confirm is it mandatory to get java class generated from avsc file itself and code wont work with Manaul java class creation?
Also can you share how can we achieve this with gradle?
Sometimes Java classes can be complex. For example, you want a Date/Instant object, but then Avro doesn't have a date type, so it should actually be encoded as a long. Rather than having Producer & Consumer application logic to handle that conversion, it can be done by Encoders in the Reflection API
Provide example for ReflectData, which are subclasses of SpecificRecords, and are easier to migrate existing Jackson/Gson POJOs into this model rather than write up an Avro schema
The docker-compose.yml files refer to version 5.3.0
, which is not the latest version available. They should be updated to the latest version, with some cadence.
Example 2, Example 3a, and Example 3b use the same graphic in the README.
Since these are different example scenarios, should the graphics show something different?
Hello,
Is there any way to create a easy example with (debezium) mssql + kafka + elastic + (ksql) in docker yaml ?
It's very hard to find working implementation to test and stress it ?
BR
d postgres-debezium-ksql-elasticsearch/
cd docker-compose
After docker-compose -up -d
$ docker-compose ps
Name Command State Ports
docker-compose_connect-debezium_1 /docker-entrypoint.sh start Up 0.0.0.0:8083->8083/tcp, 8778/tcp, 9092/tcp, 9779/tcp
docker-compose_datagen-ratings_1 bash -c echo Waiting for K ... Up
docker-compose_kafka-connect-cp_1 /etc/confluent/docker/run Exit 1
docker-compose_kafka_1 /etc/confluent/docker/run Up 0.0.0.0:9092->9092/tcp
docker-compose_ksql-cli_1 /bin/sh Up
docker-compose_ksql-server_1 /etc/confluent/docker/run Up 8088/tcp
docker-compose_postgres_1 docker-entrypoint.sh postgres Up 5432/tcp
docker-compose_schema-registry_1 /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp
docker-compose_zookeeper_1 /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 3888/tcp
elasticsearch /usr/local/bin/docker-entr ... Up 0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp
kibana /usr/local/bin/kibana-docker Up 0.0.0.0:5601->5601/tcp
""""" docker-compose_kafka-connect-cp_1 /etc/confluent/docker/run Exit 1 . """""""""
it would be nice if we have such issues corrected, so as newbies like us can easily pick up from these examples..
Currently, in CP-ALL-IN-ONE, idempotency can’t be tested out of the box since transaction.state.log.min.isr and transaction.state.log.replication.factor are left to defaults, which will cause failure to create the __transaction_state internal topic required for idempotency. A simple modification of the docker-compose file can modify this, but adding this out of the box to all one-node based deployments we give as examples can reduce friction for testing.
Receiving this error trying to use the Multiregion example.
$ docker-compose up -d
Pulling zookeeper-west (confluentinc/cp-zookeeper:5.4.0-SNAPSHOT)...
manifest for confluentinc/cp-zookeeper:5.4.0-SNAPSHOT not found: manifest unknown: manifest unknown
So when i try to run this example, examples/postgres-debezium-ksql-elasticsearch
the KSQL and datagen containers are exiting with the following logs
[kafka-admin-client-thread | adminclient-1] WARN org.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] Connection to node -1 (kafka/172.20.0.4:29092) could not be established. Broker may not be available.
[kafka-admin-client-thread | adminclient-1] WARN org.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] Connection to node -1 (kafka/172.20.0.4:29092) could not be established. Broker may not be available.
[kafka-admin-client-thread | adminclient-1] WARN org.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] Connection to node -1 (kafka/172.20.0.4:29092) could not be established. Broker may not be available.
Waiting for Confluent Schema Registry to be ready...
Unexpected response with code: 407 and content <some html content of my company's proxy server blocking access to http://schema-registry:8081/config>
Now im behind a proxy which i have correctly setup in docker but looks like the container is trying to hit the following URL http://schema-registry:8081/config and getting refused.
Following hosts are bypassed from proxy server -> localhost,127...,172...,schema-registry
Is there any specific connection settings that i need to be aware of ? or proxy bypass settings?
getting the following error message.
I'm following these step by steps
https://github.com/confluentinc/examples/blob/latest/postgres-debezium-ksql-elasticsearch/postgres-debezium-ksql-elasticsearch-docker-short.adoc
AVSC is nice, but JSON is verbose and can be consolidated using Avro IDL (AVDL) syntax.
I see there are examples for JDBC and Elastic, so perhaps the storage based connectors can be included as well?
HDFS: A Hadoop installation shouldn't be required since HDFS URI can be set to local filesystem. I've tested with hdfs.url=file:///tmp/kafka-connect
, and it worked as expected.
S3: An AWS account isn't required. See - https://blog.minio.io/journaling-kafka-messages-with-s3-connector-and-minio-83651a51045d
I have some workable code for at least S3 here - https://github.com/cricket007/kafka-connect-sandbox
Extras: Use something like Apache Drill to connect to the local Zookeeper Server and run Drill SQL queries over data (can query JSON and Avro files)
The KSQL persistent streams example in the docs is broken:
https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html#
It says to do CREATE STREAM pageviews_female AS SELECT users.userid AS userid, pageid, regionid, gender FROM pageviews LEFT JOIN users ON pageviews.userid = users.userid WHERE gender = 'FEMALE';
Which to get working I had to change to: CREATE STREAM pageviews_female AS SELECT users.userid AS userid, pageid, regionid, gender FROM pageviews LEFT JOIN users ON pageviews.userid = users.rowkey WHERE gender = 'FEMALE';
Just a heads up about a bit of sales funnel friction for a noob here!
Example 2, Example 3a, and Example 3b here use the same graphic (link below).
Since these are different example scenarios, should the graphics show something different?
Graphic used for Example 2, 3a, and 3b
Maybe include the different transforms? This is a (highly simplified) example graphic that has transforms shown: https://i0.wp.com/www.alternatestack.com/wp-content/uploads/2017/10/Screen-Shot-2017-10-27-at-1.03.40-AM.png
Downloaded the sample security code trying to run the sample code and having the following error
"kafka failed to log into your metadata server. please check all parameters and run again"
only, NOT for production usage. https://docs.confluent.io/current/cli/index.html
Using CONFLUENT_CURRENT: /tmp/confluent.daFfsIy0
zookeeper is already running. Try restarting if needed
kafka is already running. Try restarting if needed
Sogin /apps/kafka/confluent-5.3.0/etc/kafka/server.properties /tmp/original_configs/server.properties ../delta_configs/server.properties.delta
Sleeping 5 seconds before login
Sleeping 5 seconds before login
./rbac_lib.sh: line 54: expect: command not found
Failed to log into your Metadata Server. Please check all parameters and run again
[root@ftc-lbkfkdv204 scripts]#
I tried the Avro example with the following steps but it failed with an error. Could you check if I am missing something?
$ git clone https://github.com/confluentinc/cp-docker-images.git
$ cd cp-docker-images
$ git checkout 5.2.0-post
$ cd examples/cp-all-in-one/$ docker-compose up --build
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \ --data '{"schema":"{\"namespace\":\"io.confluent.examples.clients.basicavro\",\"type\":\"record\",\"name\":\"Payment\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"}' \ http://localhost:8081/subjects/transactions-value/versions{"id":1}
$ git clone https://github.com/confluentinc/examples.git
$ cd examples
$ git checkout 5.2.0-post
$ cd clients/avro
$ mvn clean compile package
$ mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ProducerExample
[INFO] Scanning for projects...
[INFO] ------------------------------------------------------------------------
[INFO] Detecting the operating system and CPU architecture
[INFO] ------------------------------------------------------------------------
[INFO] os.detected.name: osx
[INFO] os.detected.arch: x86_64
[INFO] os.detected.version: 10.14
[INFO] os.detected.version.major: 10
[INFO] os.detected.version.minor: 14
[INFO] os.detected.classifier: osx-x86_64
[INFO]
[INFO] ---------------< io.confluent:java-client-avro-examples >---------------
[INFO] Building java-client-avro-examples 5.2.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- exec-maven-plugin:1.5.0:java (default-cli) @ java-client-avro-examples ---
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
$ mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ConsumerExample
[INFO] Scanning for projects...
[INFO] ------------------------------------------------------------------------
[INFO] Detecting the operating system and CPU architecture
[INFO] ------------------------------------------------------------------------
[INFO] os.detected.name: osx
[INFO] os.detected.arch: x86_64
[INFO] os.detected.version: 10.14
[INFO] os.detected.version.major: 10
[INFO] os.detected.version.minor: 14
[INFO] os.detected.classifier: osx-x86_64
[INFO]
[INFO] ---------------< io.confluent:java-client-avro-examples >---------------
[INFO] Building java-client-avro-examples 5.2.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- exec-maven-plugin:1.5.0:java (default-cli) @ java-client-avro-examples ---
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[WARNING]
java.lang.reflect.InvocationTargetException
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:566)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
at java.lang.Thread.run (Thread.java:834)
Caused by: java.lang.IllegalStateException: No entry found for connection 2147483646
at org.apache.kafka.clients.ClusterConnectionStates.nodeState (ClusterConnectionStates.java:339)
at org.apache.kafka.clients.ClusterConnectionStates.disconnected (ClusterConnectionStates.java:143)
at org.apache.kafka.clients.NetworkClient.initiateConnect (NetworkClient.java:926)
at org.apache.kafka.clients.NetworkClient.ready (NetworkClient.java:287)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect (ConsumerNetworkClient.java:548)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess (AbstractCoordinator.java:676)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess (AbstractCoordinator.java:656)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess (RequestFuture.java:204)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess (RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete (RequestFuture.java:127)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion (ConsumerNetworkClient.java:575)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests (ConsumerNetworkClient.java:389)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetworkClient.java:297)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetworkClient.java:236)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetworkClient.java:215)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady (AbstractCoordinator.java:235)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll (ConsumerCoordinator.java:317)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded (KafkaConsumer.java:1226)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll (KafkaConsumer.java:1195)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll (KafkaConsumer.java:1135)
at io.confluent.examples.clients.basicavro.ConsumerExample.main (ConsumerExample.java:38)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:566)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
at java.lang.Thread.run (Thread.java:834)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 1.895 s
[INFO] Finished at: 2019-04-03T18:17:26+08:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.5.0:java (default-cli) on project java-client-avro-examples: An exception occured while executing the Java class. null: InvocationTargetException: No entry found for connection 2147483646 -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
The solution for the FraudService causes an NPE in the aggregate because total.getValue() could be null.
Solution:
final KTable<Windowed<Long>, OrderValue> aggregate = orders
.groupBy((id, order) -> order.getCustomerId(), Grouped.with(Serdes.Long(), Schemas.Topics.ORDERS.getValueSerde()))
.windowedBy(SessionWindows.with(Duration.ofHours(1)))
.aggregate(OrderValue::new,
(custId, order, total) -> {
return new OrderValue(order, (total.getValue() == null ? 0 : total.getValue()) + order.getQuantity() * order.getPrice());
},
(k, a, b) -> simpleMerge(a, b),
Materialized.with(null, new JsonSerdes<OrderValue>(OrderValue.class)));
The simpleMerge also doesn't aggregate the first order because it sets the value to 0 if a is null.
private OrderValue simpleMerge(final OrderValue a, final OrderValue b) {
return new OrderValue(b.getOrder(), (a == null ? 0D : a.getValue()) + b.getValue());
}
should be:
private OrderValue simpleMerge(final OrderValue a, final OrderValue b) {
return new OrderValue(b.getOrder(), ((a == null || a.getValue() == null) ? b.getValue() : a.getValue() + b.getValue()));
}
On running multi-region docker-compose setup getting under replicated partition specifically for multi-region async.
Tried to debug this issue by using @replica.max.lag.ms still the issue persists.
Following command describes the under replicated:
docker-compose exec broker-east-3 kafka-topics --describe --under-replicated-partitions --zookeeper zookeeper-central:2182
Output:
Topic: multi-region-async Partition: 0 Leader: 1 Replicas: 1,2,3,4 Isr: 1,2 Offline:
Hello Team,
Does Kafka support multiple interceptors for Producer and Consumer ? I have created one custom interceptor and also want to use the one provided by kafka. I have added those both in a list.
props.put (ProdocercConfig.INTERCEPTOR_CLASSES_CONFIG ,Array.asList(producerInterceptor.class.getName()),AuditProducerInterceptor.class.getName()));
However with this my kafka producer is not getting created.
Can anyone suggest a solution please? Is there any different way with which we can specify multiple interceptors?
So I created a java app with kafka and use docker-compose.
In my .yml I launch a zookeeper server and 3 brokers. The problem is that even with one broker, It doesn't work if the port is not 9092. the error is "[AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.".
So I can't create multiple brokers because it seems to listen on port 9092 as the erro says.
Here is how a broker is declared in the yml file.
kafka-1:
image: confluentinc/cp-kafka:5.2.1
ports:
- 9092:9092
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_BROKER_ID: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Good day,
I keep getting a timeout when trying to connect to Confluent Kafka using the credentials supplied by Confluent and using the following structure in the https://github.com/confluentinc/examples/tree/master/clients/cloud/nodejs project:
Example from CLI and client configuration:
bootstrap.servers=<SERVER_URL>
sasl.username=<CLUSTER_API_KEY>
sasl.password=<CLUSTER_API_SECRET>
Please can you have a look at this issue
Not really an issue, more some notes on
"_default_": {
and closing braces"esVersion":70
to jsonData"type.name": "type.name=kafkaconnect"
with "type.name": "_doc"
. In Elasticsearch 7, they finally removed the types.The type.name
thing is a bit dirty, it would obviously be much better to have the Elasticsearch Kafka Connector work with Elasticsearch 7.
If you don't do those changes from the start, remove everything - indices and templates in Elasticsearch, sinks in Kafka, data sources in Grafana.
You may also want to restart the services.
Also, I think that in start.sh
, elastic-dynamic-template.sh
should be called before ksql-tables-to-grafana.sh
.
Errors that happened (adding this might make it easier to find for other users):
Caused by: java.lang.RuntimeException: java.text.ParseException: Unparseable number:
error in the Elasticsearch logsSet fielddata=true on [IP] in order to load fielddata in memory by uninverting the inverted index. Note that this can however use significant memory. Alternatively use a keyword field instead.
(visible in Grafana and Elasticsearch log)Rejecting mapping update to [errors_per_min] as the final mapping would have more than 1 type: [_doc, type.name=kafkaconnect
(Elasticsearch log)ksql> run script './ksql/ksql-clickstream-demo/demo/clickstream-schema.sql';
io.confluent.ksql.parser.exception.ParseFailedException: Exception while processing statements :Source CLICKSTREAM does not exist.
ksql>
This looks to be related to confluentinc/ksql#1320 and confluentinc/ksql#1599 -- KSQL is aborting because it can't find an object even though it is created as part of the script.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.