aliwaremq / aliware-kafka-demos Goto Github PK
View Code? Open in Web Editor NEW提供各种客户端接入阿里云 消息队列 Kafka 的demo工程(Provide a demo project for various clients to access Alibaba Cloud message queue Kafka)
Home Page: https://www.aliyun.com/product/kafka
提供各种客户端接入阿里云 消息队列 Kafka 的demo工程(Provide a demo project for various clients to access Alibaba Cloud message queue Kafka)
Home Page: https://www.aliyun.com/product/kafka
各种异常检查都检查过了。
你可以运行php kafka-consumer.php执行返回正常时,把结果截个图吗?
找到原因了:
原因:子账号给了MQ的管理权限引起的bug。
因为没有看见C# demo
confluent-kafka-dotnet 也是兼容阿里云 kafkamq的是吗
^
Error: Invalid value for configuration property "security.protocol"
at Error (native)
at Producer.Client (/Users/souche_/projects_souche/node_modules/node-rdkafka/lib/client.js:54:18)
at new Producer (/Users/souche_/projects_souche/node_modules/node-rdkafka/lib/producer.js:71:10)
at Object.<anonymous> (/Users/souche_/projects_souche/test/msgQueue_ALI/producter.js:6:16)
at Module._compile (module.js:570:32)
at Object.Module._extensions..js (module.js:579:10)
at Module.load (module.js:487:32)
at tryModuleLoad (module.js:446:12)
at Function.Module._load (module.js:438:3)
at Module.runMain (module.js:604:10)
使用 Nodejs-demo 的 producter 时弹出这个错误。检查过 node-kafka 模块,确实是有 sasl_ssl
这个 enum value
的。改成其它值,也同样报错。
报错如下:
➜ kafka git:(master) ✗ node producer.js
[ 'gzip', 'snappy', 'sasl', 'regex', 'lz4' ]
0.9.5
/Users/liuxing/Project/cryptape/minerdash/app/kafka/node_modules/node-rdkafka/lib/client.js:54
this._client = new SubClientType(globalConf, topicConf);
^
Error: Invalid value for configuration property "security.protocol"
php client正常发布消息没有问题。但是如果topic不存在,就长时间在:(produce(RD_KAFKA_PARTITION_UA, 0 , $data)),脚本不往下执行。请问如何设置超时时间呢?
%3|1501655805.616|ERROR|rdkafka#producer-1| [thrd:sasl_ssl://kafka-ons-internet.aliyun.com:8080/bootstrap]: sasl_ssl://kafka-ons-internet.aliyun.com:8080/bootstrap: SASL authentication failure: Disconnected: check client PLAIN credentials and broker logs
%3|1501655805.616|ERROR|rdkafka#producer-1| [thrd:sasl_ssl://kafka-ons-internet.aliyun.com:8080/bootstrap]: 1/1 brokers are down
类似这种异常:org.apache.kafka.clients.NetworkClient : Bootstrap broker kafka-ons-internet.aliyun.com:8080 disconnected
当你的kafka-java-demo成功执行之后,你就打算将客户端集成至框架中,我这里是集成spring的时候跑出了该异常,后来多次排查之后,发现是maven依赖导致的冲突,最终解决途径是
将spring相关依赖版本统一修改成4.3.9.RELEASE,spring-kafka的改为1.2.2.RELEASE即可,其余的maven依赖正常依赖即可,代码如下:
`
<spring.version>4.3.9.RELEASE</spring.version>
org.springframework.kafka
spring-kafka
1.2.2.RELEASE
org.apache.kafka
kafka-clients
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-sasl-client</artifactId>
<version>0.1</version>
</dependency>
<!-- springframe start -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- springframe end -->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-sasl-client</artifactId>
<version>0.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.6</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies> `
我使用 公测版本可以使用,修改了商用版本的之后就会报这样子的错误
172.20.1.142:45936->172.17.102.2:9092: read: connection reset by peer
2018-07-17T04:09:37.366Z INFO kafka/log.go:36 kafka message: client/metadata got error from broker while fetching metadata:%!(EXTRA *net.OpError=read tcp 172.20.1.142:45936->172.17.102.2:9092: read: connection reset by peer)
2018-07-17T04:09:37.366Z INFO kafka/log.go:36 kafka message: client/metadata no available broker to send metadata request to
2018-07-17T04:09:37.366Z INFO kafka/log.go:36 client/brokers resurrecting 3 dead seed brokers
2018-07-17T04:09:37.366Z INFO kafka/log.go:36 client/metadata retrying after 250ms... (1 attempts remaining)
2018-07-17T04:09:37.616Z INFO kafka/log.go:36 client/metadata fetching metadata for all topics from broker 172.17.102.3:9092
2018-07-17T04:09:37.617Z INFO kafka/log.go:36 Failed to connect to broker 172.17.102.3:9092: read tcp 172.20.1.142:49526->172.17.102.3:9092: read: connection reset by peer
2018-07-17T04:09:37.617Z INFO kafka/log.go:36 kafka message: client/metadata got error from broker while fetching metadata:%!(EXTRA *net.OpError=read tcp 172.20.1.142:49526->172.17.102.3:9092: read: connection reset by peer)
2018-07-17T04:09:37.617Z INFO kafka/log.go:36 client/metadata fetching metadata for all topics from broker 172.17.102.1:9092
2018-07-17T04:09:37.617Z INFO kafka/log.go:36 Failed to connect to broker 172.17.102.1:9092: read tcp 172.20.1.142:49556->172.17.102.1:9092: read: connection reset by peer
2018-07-17T04:09:37.617Z INFO kafka/log.go:36 kafka message: client/metadata got error from broker while fetching metadata:%!(EXTRA *net.OpError=read tcp 172.20.1.142:49556->172.17.102.1:9092: read: connection reset by peer)
2018-07-17T04:09:37.617Z INFO kafka/log.go:36 client/metadata fetching metadata for all topics from broker 172.17.102.2:9092
2018-07-17T04:09:37.618Z INFO kafka/log.go:36 Failed to connect to broker 172.17.102.2:9092: read tcp 172.20.1.142:45942->172.17.102.2:9092: read: connection reset by peer
2018-07-17T04:09:37.618Z INFO kafka/log.go:36 kafka message: client/metadata got error from broker while fetching metadata:%!(EXTRA *net.OpError=read tcp 172.20.1.142:45942->172.17.102.2:9092: read: connection reset by peer)
2018-07-17T04:09:37.618Z INFO kafka/log.go:36 kafka message: client/metadata no available broker to send metadata request to
2018-07-17T04:09:37.618Z INFO kafka/log.go:36 client/brokers resurrecting 3 dead seed brokers
2018-07-17T04:09:37.618Z INFO kafka/log.go:36 kafka message: Closing Client
2018-07-17T04:09:37.618Z ERROR kafka/client.go:74 Kafka connect fails with: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
2018-07-17T04:09:37.618Z ERROR pipeline/output.go:74 Failed to connect: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
2018-07-17T04:09:37.618Z INFO kafka/log.go:36 kafka message: Initializing new client
2018-07-17T04:09:37.618Z INFO kafka/log.go:36 client/metadata fetching metadata for all topics from broker 172.17.102.2:9092
demo 里面并没有基于 springboot 的版本, 我根据spring的版本改写了一下, 由于 springboot2.0依赖的是spring5.0 的版本, 官方提供的 spring-kafka 版本太低了, 升级 spring-kafka 的版本后发现 kafka-client 的版本又过低, 请问可以使用高版本的kafka-client吗, 如果能给出基于 springboot2.0 的 demo 就更好了
错误如下:
SSL handshake failed: error:0A000086:SSL routines::certificate verify failed: broker certificate could not be verified, verify that ssl.ca.location is correctly configured or root CA certificates are installed (brew install openssl) (after 77ms in state SSL_HANDSHAKE)
版本信息如下:
node-rdkafka版本是2.17.0
消息队列 kafka 使用go SDK 的confluent客户端,在各自平台编译(darwin, windows, linux)都没问题, 但是跨平台编译则失败
consumer.go:52:40: undefined: kafka.Consumer
consumer.go:55:19: undefined: kafka.ConfigMap
consumer.go:82:9: undefined: kafka.NewError
consumer.go:82:24: undefined: kafka.ErrUnknownProtocol
consumer.go:85:19: undefined: kafka.NewConsumer
有没有遇到的,或者有解决方案的
使用的就是ca-cert.pem,跟运行程序放在同一目录。
%3|1649398843.295|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://alikafka-pre-cn-7mz2lwrke00l-3.alikafka.aliyuncs.com]: sasl_ssl://alikafka-pre-cn-7mz2lwrke00l-3.alikafka.aliyuncs.com:9093/bootstrap: SSL handshake failed: error:1416F086:SSL routines:tls_process_server_certificate:certificate verify failed: broker certificate could not be verified, verify that ssl.ca.location is correctly configured or root CA certificates are installed (install ca-certificates package) (after 102ms in state CONNECT)
%3|1649398844.323|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://alikafka-pre-cn-7mz2lwrke00l-2.alikafka.aliyuncs.com]: sasl_ssl://alikafka-pre-cn-7mz2lwrke00l-2.alikafka.aliyuncs.com:9093/bootstrap: SSL handshake failed: error:1416F086:SSL
需要在jboss下standalone.xml配置文件中,添加:
<security-domain name="KafkaClient" cache-type="default">
<authentication>
<login-module code="org.apache.kafka.common.security.plain.PlainLoginModule" flag="required">
<module-option name="username" value="xxxx"/>
<module-option name="password" value="xxxx"/>
</login-module>
</authentication>
</security-domain>
注:username为accessKey、password为secretkey后10位,且此时需要在代码中将sasl_mechanism设为PLAIN
kafkaDemo/services 包名错误,应该指定aliware-kafka-demos/kafka-go-demo/services
%3|1509992234.463|ERROR|rdkafka#consumer-1| [thrd:sasl_ssl://kafka-ons-internet.aliyun.com:8080/bootstrap]: sasl_ssl://kafka-ons-internet.aliyun.com:8080/bootstrap: Receive failed: SSL syscall error number: 5: Connection reset by peer
%5|1509992234.463|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://kafka-ons-internet.aliyun.com:8080/bootstrap]: sasl_ssl://kafka-ons-internet.aliyun.com:8080/bootstrap: Connection closed
%3|1509992234.463|ERROR|rdkafka#consumer-1| [thrd:sasl_ssl://kafka-ons-internet.aliyun.com:8080/bootstrap]: sasl_ssl://kafka-ons-internet.aliyun.com:8080/bootstrap: Connection closed
%3|1509992264.984|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://kafka-ons-internet.aliyun.com:8080/bootstrap]: sasl_ssl://kafka-ons-internet.aliyun.com:8080/bootstrap: Receive failed: SSL syscall error number: 5: Connection reset by peer
%3|1509992264.984|ERROR|rdkafka#consumer-1| [thrd:sasl_ssl://kafka-ons-internet.aliyun.com:8080/bootstrap]: sasl_ssl://kafka-ons-internet.aliyun.com:8080/bootstrap: Receive failed: SSL syscall error number: 5: Connection reset by peer
Producer发送消息的时候不是直接发送到分区所在的broker吗?怎么做到的failover到其它分区?
分区挂掉的时候Producer重新计算了要发送的分区吗?如果是这样,这个是客户端实现的功能吗?
如果发送时指定了分区,指定的分区会被谁自动覆盖掉吗?
springboot 2.0 不兼容 ,只能放弃spring-Kafka,使用kafka-clients。
但是 kafka-clients 版本太老了,消费者 回调都不能用,只能用 while (true) 难受的一批
具体错误如下:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 30000 ms.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.(KafkaProducer.java:730)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:483)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353)
at com.aliyun.openservices.kafka.ons.KafkaProducerDemo.main(KafkaProducerDemo.java:53)
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 30000 ms.
客户端版本用的是推荐版本:
org.apache.kafka
kafka-clients
0.10.0.0
org.slf4j
slf4j-api
RT
准备在接口调用中使用kafka记录日志,但是发送消息后会有一个1.5秒左右的时间等待时间,如何设置能够发送完消息不需要等待
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:335)
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:188)
at net.ios666.core.kafka.conf.Test.main(Test.java:44)
Caused by: org.apache.kafka.common.KafkaException: java.lang.SecurityException: java.io.IOException: 配置错误:
行 1: 应为 [{], 找到 [caroot]
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:277)
... 2 more
Caused by: java.lang.SecurityException: java.io.IOException: 配置错误:
行 1: 应为 [{], 找到 [caroot]
at sun.security.provider.ConfigFile$Spi.(ConfigFile.java:137)
at sun.security.provider.ConfigFile.(ConfigFile.java:102)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at javax.security.auth.login.Configuration$2.run(Configuration.java:255)
at javax.security.auth.login.Configuration$2.run(Configuration.java:247)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:246)
at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:61)
at org.apache.kafka.common.security.authenticator.LoginManager.(LoginManager.java:46)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)
... 5 more
Caused by: java.io.IOException: 配置错误:
行 1: 应为 [{], 找到 [caroot]
at sun.security.provider.ConfigFile$Spi.ioException(ConfigFile.java:666)
at sun.security.provider.ConfigFile$Spi.match(ConfigFile.java:532)
at sun.security.provider.ConfigFile$Spi.parseLoginEntry(ConfigFile.java:445)
at sun.security.provider.ConfigFile$Spi.readConfig(ConfigFile.java:427)
at sun.security.provider.ConfigFile$Spi.init(ConfigFile.java:329)
at sun.security.provider.ConfigFile$Spi.init(ConfigFile.java:271)
at sun.security.provider.ConfigFile$Spi.(ConfigFile.java:135)
... 19 more
试了一下,调用bus/refresh的时候没有发送消息。
var consumer = new Kafka.KafkaConsumer({
/'debug': 'all',/
//'api.version.request': 'true',
'bootstrap.servers': config['bootstrap.servers'],
'security.protocol' : 'sasl_ssl',
'ssl.ca.location' : './ca-cert',
'sasl.mechanisms' : 'PLAIN',
'sasl.username' : config['sasl_plain_username'],
'sasl.password' : config['sasl_plain_password'],
'group.id' : config['consumer_id']
});
config里面是"bootstrap_servers"
消息队列 kafka 使用 go SDK 的 confluent 客户端,通过 sasl_ssl 连接 kafka,想在 docker 中运行,但是验证证书看起来有问题【证书文件用的时 demo/conf 内 ca-cert.pem 文件】。
dockerfile 如下:
FROM golang:1.16 as builder
RUN apt-get install gcc && apt-get install ca-certificates
RUN go env -w GOPROXY="https://goproxy.cn,direct"
WORKDIR /opt/kafka
COPY . .
RUN go mod tidy
RUN go build
ENTRYPOINT ["./kafka"]
日志如下:
init kafka producer, it may take a few seconds to init the connection
init kafka producer success
HTTP server Start :7070
%3|1641796736.273|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://alikafka-pre-cn-8ed2i1tx7001-1.alikafka.aliyuncs.com]: sasl_ssl://alikafka-pre-cn-8ed2i1tx7001-1.alikafka.aliyuncs.com:9093/bootstrap: SSL handshake failed: error:1416F086:SSL routines:tls_process_server_certificate:certificate verify failed: broker certificate could not be verified, verify that ssl.ca.location is correctly configured or root CA certificates are installed (install ca-certificates package) (after 78ms in state SSL_HANDSHAKE)
%3|1641796737.008|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://alikafka-pre-cn-8ed2i1tx7001-1.alikafka.aliyuncs.com]: sasl_ssl://alikafka-pre-cn-8ed2i1tx7001-1.alikafka.aliyuncs.com:9093/bootstrap: SSL handshake failed: error:1416F086:SSL routines:tls_process_server_certificate:certificate verify failed: broker certificate could not be verified, verify that ssl.ca.location is correctly configured or root CA certificates are installed (install ca-certificates package) (after 44ms in state SSL_HANDSHAKE, 1 identical error(s) suppressed)
日志提醒两个问题 ssl.ca.location is correctly configured or root CA certificates are installed。
确认证书路径没有问题;
ca-certificates如下:
apt-get install ca-certificates
Reading package lists... Done
Building dependency tree
Reading state information... Done
ca-certificates is already the newest version (20200601~deb10u2).
0 upgraded, 0 newly installed, 0 to remove and 0 not upgraded.
帮看下还有什么会导致这个错误,谢谢
build的步骤与目录结构已经不一致了
https://github.com/AliwareMQ/aliware-kafka-demos/tree/master/kafka-go-demo
就是这里提到的分区顺序:https://help.aliyun.com/document_detail/49319.html?spm=5176.doc29537.6.566.xBDPnV
文档里面写道:
消息过滤
Kafka 自身没有消息过滤的语义。实践中可以采取以下两个办法:
如果过滤的种类不多,可以采取多个Topic的方式达到过滤的目的
如果过滤的种类多,则最好在客户端业务层面自行过滤
实践中根据业务具体情况进行选择,可以综合运用上面两种办法。
注意:云上 Kafka 会收取 topic 资源占用费,每天2块钱。
也就是Kafka应该是不支持分区顺序的。但是支持分区顺序的消息队列,阿里云都不提供nodejs的sdk。
根据logstash input配置编写的配置,
在sasl_mechanism => "ONS"的时候,数据无法从Kafka消费出来,logstash日志也没有任何报错信息,logstash版本是5.5.3
sasl_mechanism => "PLAIN"能正常工作,估计是demo里面的配置编写错误,希望能及时更新
I want try to use .NET Core.
from pykafka import KafkaClient
import json
import logging as log
log.basicConfig(level=log.DEBUG)
kafka_client = KafkaClient(hosts='xx:9092,xx:9092,xx:9092')
kafka_msg_topic = kafka_client.topics['product_incoming']
producer = kafka_msg_topic.get_sync_producer()
producer.produce(json.dumps({'tbl': 'product', 'ids': [239981], 'action': 'u', 'data': {'raw_price': 100, 'zk_price': 90}}))
----------------- 以下时报错日志 -----------------
INFO:pykafka.cluster:Requesting API version information
DEBUG:pykafka.connection:Connecting to 172.17.112.191:9092
DEBUG:pykafka.connection:Successfully connected to 172.17.112.191:9092
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue
INFO:pykafka.cluster:Got api version info: {0: ApiVersionsSpec(key=0, min=0, max=2), 1: ApiVersionsSpec(key=1, min=0, max=2), 2: ApiVersionsSpec(key=2, min=0, max=0), 3: ApiVersionsSpec(key=3, min=0, max=1), 4: ApiVersionsSpec(key=4, min=0, max=0), 5: ApiVersionsSpec(key=5, min=0, max=0), 6: ApiVersionsSpec(key=6, min=0, max=2), 7: ApiVersionsSpec(key=7, min=1, max=1), 8: ApiVersionsSpec(key=8, min=0, max=2), 9: ApiVersionsSpec(key=9, min=0, max=1), 10: ApiVersionsSpec(key=10, min=0, max=0), 11: ApiVersionsSpec(key=11, min=0, max=0), 12: ApiVersionsSpec(key=12, min=0, max=0), 13: ApiVersionsSpec(key=13, min=0, max=0), 14: ApiVersionsSpec(key=14, min=0, max=0), 15: ApiVersionsSpec(key=15, min=0, max=0), 16: ApiVersionsSpec(key=16, min=0, max=0), 17: ApiVersionsSpec(key=17, min=0, max=0), 18: ApiVersionsSpec(key=18, min=0, max=0), 19: ApiVersionsSpec(key=19, min=0, max=0)}
DEBUG:pykafka.cluster:Updating cluster, attempt 1/3
DEBUG:pykafka.connection:Connecting to 172.17.112.191:9092
DEBUG:pykafka.connection:Successfully connected to 172.17.112.191:9092
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue
INFO:pykafka.cluster:Discovered 3 brokers
DEBUG:pykafka.cluster:Discovered broker id 101: 172.17.112.191:9092
DEBUG:pykafka.connection:Connecting to 172.17.112.191:9092
DEBUG:pykafka.connection:Successfully connected to 172.17.112.191:9092
DEBUG:pykafka.cluster:Discovered broker id 102: 172.17.112.192:9092
DEBUG:pykafka.connection:Connecting to 172.17.112.192:9092
DEBUG:pykafka.connection:Successfully connected to 172.17.112.192:9092
DEBUG:pykafka.cluster:Discovered broker id 103: 172.17.112.193:9092
DEBUG:pykafka.connection:Connecting to 172.17.112.193:9092
DEBUG:pykafka.connection:Successfully connected to 172.17.112.193:9092
INFO:pykafka.cluster:Discovered 2 topics
DEBUG:pykafka.cluster:Discovered topic 'product_incoming'
DEBUG:pykafka.connection:Connecting to 172.17.112.191:9092
DEBUG:pykafka.connection:Successfully connected to 172.17.112.191:9092
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue
INFO:pykafka.topic:Adding 24 partitions
DEBUG:pykafka.topic:Adding partition product_incoming/0
DEBUG:pykafka.topic:Adding partition product_incoming/1
DEBUG:pykafka.topic:Adding partition product_incoming/2
DEBUG:pykafka.topic:Adding partition product_incoming/3
DEBUG:pykafka.topic:Adding partition product_incoming/4
DEBUG:pykafka.topic:Adding partition product_incoming/5
DEBUG:pykafka.topic:Adding partition product_incoming/6
DEBUG:pykafka.topic:Adding partition product_incoming/7
DEBUG:pykafka.topic:Adding partition product_incoming/8
DEBUG:pykafka.topic:Adding partition product_incoming/9
DEBUG:pykafka.topic:Adding partition product_incoming/10
DEBUG:pykafka.topic:Adding partition product_incoming/11
DEBUG:pykafka.topic:Adding partition product_incoming/12
DEBUG:pykafka.topic:Adding partition product_incoming/13
DEBUG:pykafka.topic:Adding partition product_incoming/14
DEBUG:pykafka.topic:Adding partition product_incoming/15
DEBUG:pykafka.topic:Adding partition product_incoming/16
DEBUG:pykafka.topic:Adding partition product_incoming/17
DEBUG:pykafka.topic:Adding partition product_incoming/18
DEBUG:pykafka.topic:Adding partition product_incoming/19
DEBUG:pykafka.topic:Adding partition product_incoming/20
DEBUG:pykafka.topic:Adding partition product_incoming/21
DEBUG:pykafka.topic:Adding partition product_incoming/22
DEBUG:pykafka.topic:Adding partition product_incoming/23
<pykafka.topic.Topic at 0x2593c90 (name=product_incoming)>
INFO:pykafka.producer:Starting new produce worker for broker 101
INFO:pykafka.producer:Starting new produce worker for broker 102
INFO:pykafka.producer:Starting new produce worker for broker 103
DEBUG:pykafka.producer:Sending 1 messages to broker 101
WARNING:pykafka.producer:Produce request for product_incoming/1 to 172.17.112.191:9092 failed with error code -1.
DEBUG:pykafka.producer:Successfully sent 0/1 messages to broker 101
DEBUG:pykafka.producer:Sending 1 messages to broker 101
WARNING:pykafka.producer:Produce request for product_incoming/1 to 172.17.112.191:9092 failed with error code -1.
DEBUG:pykafka.producer:Successfully sent 0/1 messages to broker 101
DEBUG:pykafka.producer:Sending 1 messages to broker 101
WARNING:pykafka.producer:Produce request for product_incoming/1 to 172.17.112.191:9092 failed with error code -1.
DEBUG:pykafka.producer:Successfully sent 0/1 messages to broker 101
DEBUG:pykafka.producer:Sending 1 messages to broker 101
WARNING:pykafka.producer:Produce request for product_incoming/1 to 172.17.112.191:9092 failed with error code -1.
DEBUG:pykafka.producer:Successfully sent 0/1 messages to broker 101
ERROR:pykafka.producer:Message not delivered!! UnknownError('Produce request for product_incoming/1 to 172.17.112.191:9092 failed with error code -1.',)
Traceback (most recent call last):
File "pykafka_test.py", line 13, in
producer.produce(json.dumps({'tbl': 'product', 'ids': [239981], 'action': 'u', 'data': {'raw_price': 100, 'zk_price': 90}}))
File "/usr/lib64/python2.7/site-packages/pykafka/producer.py", line 354, in produce
raise exc
pykafka.exceptions.UnknownError: Produce request for product_incoming/1 to 172.17.112.191:9092 failed with error code -1.
DEBUG:pykafka.producer:Finalising <pykafka.producer.Producer at 0x259a310>
INFO:pykafka.producer:Blocking until all messages are sent
INFO:pykafka.handlers:RequestHandler worker: exiting cleanly
INFO:pykafka.handlers:RequestHandler worker: exiting cleanly
INFO:pykafka.handlers:RequestHandler worker: exiting cleanly
INFO:pykafka.producer:Worker exited for broker 172.17.112.192:9092
INFO:pykafka.producer:Worker exited for broker 172.17.112.193:9092
INFO:pykafka.producer:Worker exited for broker 172.17.112.191:9092
INFO:pykafka.producer:Blocking until all messages are sent
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue
C:\project\kafka-java-demo\vpc\src\main\java\com\aliyun\openservices\kafka\ons\KafkaConsumerDemo.java:51:67
[java: 无法访问java.time.Duration
找不到java.time.Duration的类文件]
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.