I tried to use this for an IT that i have been trying. I am using 2.13 scala and the version of kafka libs are using 5.5.0-ccs. So i ended up using 5.5.0.1 version of embedded kafka. It seems to be booting up alright and i do see netstat happening initially. The SR dies midway though
bearer.auth.token = [hidden]
proxy.port = -1
schema.reflection = false
auto.register.schemas = true
| => nmax.schemas.per.subject = 1000kaRedisSyncIT 0s
basic.auth.credentials.source = URL
specific.avro.reader = false
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
schema.registry.url = [http://localhost:8081]
basic.auth.user.info = [hidden]
proxy.host =
schema.registry.basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
(io.confluent.kafka.serializers.KafkaAvroDeserializerConfig:179)
[2020-05-21 23:23:25,072] INFO KafkaAvroDeserializerConfig values:
bearer.auth.token = [hidden]
proxy.port = -1
schema.reflection = false
auto.register.schemas = true
| => nmax.schemas.per.subject = 1000kaRedisSyncIT 0s
basic.auth.credentials.source = URL
specific.avro.reader = false
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
schema.registry.url = [http://localhost:8081]
basic.auth.user.info = [hidden]
proxy.host =
schema.registry.basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
(io.confluent.kafka.serializers.KafkaAvroDeserializerConfig:179)
**Getting connected to localhost:6001
[2020-05-21 23:23:42,134] INFO SchemaRegistryConfig values:
access.control.allow.headers =
access.control.allow.methods =
access.control.allow.origin =
authentication.method = NONE
| => nauthentication.realm = ync.KafkaRedisSyncIT 17s
authentication.roles = [*]
authentication.skip.paths = []
avro.compatibility.level =
compression.enable = true
debug = false
host.name = 192.168.0.152
idle.timeout.ms = 30000
inter.instance.headers.whitelist = []
inter.instance.protocol = http
kafkastore.bootstrap.servers = [localhost:6001]
kafkastore.connection.url =
kafkastore.group.id =
kafkastore.init.timeout.ms = 60000
kafkastore.sasl.kerberos.kinit.cmd = /usr/bin/kinit
kafkastore.sasl.kerberos.min.time.before.relogin = 60000
kafkastore.sasl.kerberos.service.name =
kafkastore.sasl.kerberos.ticket.renew.jitter = 0.05
kafkastore.sasl.kerberos.ticket.renew.window.factor = 0.8
kafkastore.sasl.mechanism = GSSAPI
kafkastore.security.protocol = PLAINTEXT
kafkastore.ssl.cipher.suites =
kafkastore.ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1
kafkastore.ssl.endpoint.identification.algorithm =
kafkastore.ssl.key.password = [hidden]
kafkastore.ssl.keymanager.algorithm = SunX509
kafkastore.ssl.keystore.location =
kafkastore.ssl.keystore.password = [hidden]
kafkastore.ssl.keystore.type = JKS
kafkastore.ssl.protocol = TLS
kafkastore.ssl.provider =
kafkastore.ssl.trustmanager.algorithm = PKIX
kafkastore.ssl.truststore.location =
kafkastore.ssl.truststore.password = [hidden]
kafkastore.ssl.truststore.type = JKS
kafkastore.timeout.ms = 500
kafkastore.topic = _schemas
kafkastore.topic.replication.factor = 3
kafkastore.write.max.retries = 5
kafkastore.zk.session.timeout.ms = 30000
listeners = [http://localhost:6002]
master.eligibility = true
metric.reporters = []
metrics.jmx.prefix = kafka.schema.registry
metrics.num.samples = 2
metrics.sample.window.ms = 30000
metrics.tag.map = []
mode.mutability = false
port = 8081
request.logger.name = io.confluent.rest-utils.requests
resource.extension.class = []
resource.extension.classes = []
resource.static.locations = []
response.mediatype.default = application/vnd.schemaregistry.v1+json
response.mediatype.preferred = [application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json]
rest.servlet.initializor.classes = []
schema.compatibility.level = backward
schema.providers = []
schema.registry.group.id = schema-registry
schema.registry.inter.instance.protocol =
schema.registry.resource.extension.class = []
schema.registry.zk.namespace = schema_registry
shutdown.graceful.ms = 1000
ssl.cipher.suites = []
ssl.client.auth = false
ssl.client.authentication = NONE
ssl.enabled.protocols = []
ssl.endpoint.identification.algorithm = null
ssl.key.password = [hidden]
ssl.keymanager.algorithm =
ssl.keystore.location =
ssl.keystore.password = [hidden]
ssl.keystore.reload = false
ssl.keystore.type = JKS
ssl.keystore.watch.location =
ssl.protocol = TLS
ssl.provider =
ssl.trustmanager.algorithm =
ssl.truststore.location =
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
websocket.path.prefix = /ws
websocket.servlet.initializor.classes = []
zookeeper.set.acl = false
(io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig:347)
[2020-05-21 23:23:42,153] INFO Logging initialized @44049ms to org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log:169)
[2020-05-21 23:23:42,232] INFO Adding listener: http://localhost:6002 (io.confluent.rest.ApplicationServer:344)
[2020-05-21 23:23:42,308] INFO Initializing KafkaStore with broker endpoints: PLAINTEXT://localhost:6001 (io.confluent.kafka.schemaregistry.storage.KafkaStore:108)
[2020-05-21 23:23:42,309] INFO Registering schema provider for AVRO: io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry:210)
[2020-05-21 23:23:42,309] INFO Registering schema provider for JSON: io.confluent.kafka.schemaregistry.json.JsonSchemaProvider (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry:210).smartdrive.refdata.sync.KafkaRedisSyncIT 17s
[2020-05-21 23:23:42,310] INFO Registering schema provider for PROTOBUF: io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry:210)
[2020-05-21 23:23:42,345] INFO Creating schemas topic _schemas (io.confluent.kafka.schemaregistry.storage.KafkaStore:193)
[2020-05-21 23:23:42,346] WARN Creating the schema topic _schemas using a replication factor of 1, which is less than the desired one of 3. If this is a production environment, it's crucial to add more brokers and increase the replication factor of the topic. (io.confluent.kafka.schemaregistry.storage.KafkaStore:203)
[2020-05-21 23:23:42,513] INFO Kafka store reader thread starting consumer (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:110)
[2020-05-21 23:23:42,527] INFO Initialized last consumed offset to -1 (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:144)
[2020-05-21 23:23:42,528] INFO [kafka-store-reader-thread-_schemas]: Starting (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:66)
[2020-05-21 23:23:42,623] INFO Wait to catch up until the offset at 0 (io.confluent.kafka.schemaregistry.storage.KafkaStore:304)
[2020-05-21 23:23:42,640] INFO Joining schema registry with Kafka-based coordination (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry:297)
[2020-05-21 23:23:45,817] INFO Finished rebalance with master election result: Assignment{version=1, error=0, master='sr-1-5e328d55-d8bd-41dc-b711-cfb93e166265', masterIdentity=version=1,host=192.168.0.152,port=6002,scheme=http,masterEligibility=true} (io.confluent.kafka.schemaregistry.masterelector.kafka.KafkaGroupMasterElector:228)
[2020-05-21 23:23:45,821] INFO Wait to catch up until the offset at 1 (io.confluent.kafka.schemaregistry.storage.KafkaStore:304)
[2020-05-21 23:23:45,991] INFO jetty-9.4.24.v20191120; built: 2019-11-20T21:37:49.771Z; git: 363d5f2df3a8a28de40604320230664b9c793c16; jvm 1.8.0_242-b08 (org.eclipse.jetty.server.Server:359)
[2020-05-21 23:23:46,036] INFO DefaultSessionIdManager workerName=node0 (org.eclipse.jetty.server.session:333)
[2020-05-21 23:23:46,037] INFO No SessionScavenger set, using defaults (org.eclipse.jetty.server.session:338)
[2020-05-21 23:23:46,038] INFO node0 Scavenging every 660000ms (org.eclipse.jetty.server.session:140)
A provider io.confluent.kafka.schemaregistry.rest.resources.ConfigResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider io.confluent.kafka.schemaregistry.rest.resources.ConfigResource will be ignored.
A provider io.confluent.kafka.schemaregistry.rest.resources.CompatibilityResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider io.confluent.kafka.schemaregistry.rest.resources.CompatibilityResource will be ignored.
A provider io.confluent.kafka.schemaregistry.rest.resources.SchemasResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider io.confluent.kafka.schemaregistry.rest.resources.SchemasResource will be ignored.
A provider io.confluent.kafka.schemaregistry.rest.resources.ServerMetadataResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider io.confluent.kafka.schemaregistry.rest.resources.ServerMetadataResource will be ignored.
A provider io.confluent.kafka.schemaregistry.rest.resources.SubjectsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider io.confluent.kafka.schemaregistry.rest.resources.SubjectsResource will be ignored.
A provider io.confluent.kafka.schemaregistry.rest.resources.ModeResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider io.confluent.kafka.schemaregistry.rest.resources.ModeResource will be ignored.
A provider io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource will be ignored.
[2020-05-21 23:23:46,587] INFO HV000001: Hibernate Validator 6.0.17.Final (org.hibernate.validator.internal.util.Version:21)
[2020-05-21 23:23:46,841] INFO JVM Runtime does not support Modules (org.eclipse.jetty.util.TypeUtil:201)
[2020-05-21 23:23:46,842] INFO Started o.e.j.s.ServletContextHandler@745c35cf{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:825)
[2020-05-21 23:23:46,867] INFO Started o.e.j.s.ServletContextHandler@7079c42e{/ws,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:825)
[2020-05-21 23:23:46,881] INFO Started NetworkTrafficServerConnector@4e1c56f1{HTTP/1.1,[http/1.1]}{localhost:6002} (org.eclipse.jetty.server.AbstractConnector:330)
[2020-05-21 23:23:46,881] INFO Started @48777ms (org.eclipse.jetty.server.Server:399)
[2020-05-21 23:23:46,891] INFO KafkaAvroSerializerConfig values:
bearer.auth.token = [hidden]
| => nproxy.port = -1efdata.sync.KafkaRedisSyncIT 22s
schema.reflection = false
auto.register.schemas = true
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
schema.registry.url = [http://localhost:6002]
basic.auth.user.info = [hidden]
proxy.host =
schema.registry.basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
(io.confluent.kafka.serializers.KafkaAvroSerializerConfig:179)
[2020-05-21 23:23:46,891] INFO KafkaAvroSerializerConfig values:
bearer.auth.token = [hidden]
proxy.port = -1
schema.reflection = false
auto.register.schemas = true
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
schema.registry.url = [http://localhost:6002]
basic.auth.user.info = [hidden]
proxy.host =
schema.registry.basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
(io.confluent.kafka.serializers.KafkaAvroSerializerConfig:179)
[2020-05-21 23:23:46,952] INFO Stopped NetworkTrafficServerConnector@4e1c56f1{HTTP/1.1,[http/1.1]}{localhost:6002} (org.eclipse.jetty.server.AbstractConnector:380)
[2020-05-21 23:23:46,952] INFO node0 Stopped scavenging (org.eclipse.jetty.server.session:158)
[2020-05-21 23:23:46,956] INFO Stopped o.e.j.s.ServletContextHandler@7079c42e{/ws,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:1016)
[2020-05-21 23:23:46,960] INFO Stopped o.e.j.s.ServletContextHandler@745c35cf{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:1016)
[2020-05-21 23:23:46,962] INFO Shutting down schema registry (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry:1084)
[2020-05-21 23:23:46,962] INFO [kafka-store-reader-thread-_schemas]: Shutting down (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:66)
[2020-05-21 23:23:46,963] INFO [kafka-store-reader-thread-_schemas]: Stopped (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:66)
[2020-05-21 23:23:46,963] INFO [kafka-store-reader-thread-_schemas]: Shutdown completed (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:66)
[2020-05-21 23:23:46,965] INFO KafkaStoreReaderThread shutdown complete. (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:253)
[2020-05-21 23:23:46,967] ERROR Unexpected exception in schema registry group processing thread (io.confluent.kafka.schemaregistry.masterelector.kafka.KafkaGroupMasterElector:200)
org.apache.kafka.common.errors.WakeupException
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:511)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at io.confluent.kafka.schemaregistry.masterelector.kafka.SchemaRegistryCoordinator.poll(SchemaRegistryCoordinator.java:120)
at io.confluent.kafka.schemaregistry.masterelector.kafka.KafkaGroupMasterElector$1.run(KafkaGroupMasterElector.java:197)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-05-21 23:23:46,952] INFO Stopped NetworkTrafficServerConnector@4e1c56f1{HTTP/1.1,[http/1.1]}{localhost:6002} (org.eclipse.jetty.server.AbstractConnector:380)
[2020-05-21 23:23:46,952] INFO node0 Stopped scavenging (org.eclipse.jetty.server.session:158)
[2020-05-21 23:23:46,956] INFO Stopped o.e.j.s.ServletContextHandler@7079c42e{/ws,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:1016)
[2020-05-21 23:23:46,960] INFO Stopped o.e.j.s.ServletContextHandler@745c35cf{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:1016)
[2020-05-21 23:23:46,962] INFO Shutting down schema registry (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry:1084)
[2020-05-21 23:23:46,962] INFO [kafka-store-reader-thread-_schemas]: Shutting down (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:66)
[2020-05-21 23:23:46,963] INFO [kafka-store-reader-thread-_schemas]: Stopped (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:66)
[2020-05-21 23:23:46,963] INFO [kafka-store-reader-thread-_schemas]: Shutdown completed (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:66)
[2020-05-21 23:23:46,965] INFO KafkaStoreReaderThread shutdown complete. (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:253)
[2020-05-21 23:23:46,967] ERROR Unexpected exception in schema registry group processing thread (io.confluent.kafka.schemaregistry.masterelector.kafka.KafkaGroupMasterElector:200)
org.apache.kafka.common.errors.WakeupException
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:511)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at io.confluent.kafka.schemaregistry.masterelector.kafka.SchemaRegistryCoordinator.poll(SchemaRegistryCoordinator.java:120)
at io.confluent.kafka.schemaregistry.masterelector.kafka.KafkaGroupMasterElector$1.run(KafkaGroupMasterElector.java:197)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
org.apache.kafka.common.errors.TimeoutException: Topic topic_test not present in metadata after 60000 ms.
The End {0}
org.apache.kafka.common.errors.TimeoutException: Topic topic_test not present in metadata after 60000 ms.
"RefDataKafkaVerticle" should "start kafka and sink a message" in {
implicit val kafkaEmbeddedConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(zooKeeperPort = 6000, kafkaPort = 6001 , schemaRegistryPort = 6002)
kafkaEmbeddedConfig.customSchemaRegistryProperties
I have since then tried , changing to broadcast IP (0.0.0.0) , tweaking kafka versions to no avail.