Giter Site home page Giter Site logo

embedded-kafka's Introduction

embedded-kafka

Maven Central Test Codacy Badge Codacy Coverage Badge Mergify Status Scala Steward badge

A library that provides an in-memory Kafka instance to run your tests against.

Inspired by kafka-unit.

Version compatibility matrix

embedded-kafka is available on Maven Central, compiled for Scala 2.12, 2.13 and Scala 3 (since v3.4.0.1).

Versions match the version of Kafka they're built against.

Important known limitation (prior to v2.8.0)

Prior to v2.8.0 Kafka core was inlining the Scala library, so you couldn't use a different Scala patch version than what Kafka used to compile its jars!

Breaking change: new package name

From v2.8.0 onwards package name has been updated to reflect the library group id (i.e. io.github.embeddedkafka).

Aliases to the old package name have been added, along with a one-time Scalafix rule to ensure the smoothest migration.

embedded-kafka

How to use

  • In your build.sbt file add the following dependency (replace x.x.x with the appropriate version): "io.github.embeddedkafka" %% "embedded-kafka" % "x.x.x" % Test
  • Have your class extend the EmbeddedKafka trait.
  • Enclose the code that needs a running instance of Kafka within the withRunningKafka closure.

An example, using ScalaTest:

class MySpec extends AnyWordSpecLike with Matchers with EmbeddedKafka {

  "runs with embedded kafka" should {

    "work" in {

      withRunningKafka {
        // ... code goes here
      }
    }
  }
}
  • In-memory Zookeeper and Kafka will be instantiated respectively on port 6000 and 6001 and automatically shutdown at the end of the test.

Use without the withRunningKafka method

A EmbeddedKafka companion object is provided for usage without extending the EmbeddedKafka trait. Zookeeper and Kafka can be started and stopped in a programmatic way. This is the recommended usage if you have more than one test in your file and you don't want to start and stop Kafka and Zookeeper on every test.

class MySpec extends AnyWordSpecLike with Matchers {

  "runs with embedded kafka" should {

    "work" in {
      EmbeddedKafka.start()

      // ... code goes here

      EmbeddedKafka.stop()
    }
  }
}

Please note that in order to avoid Kafka instances not shutting down properly, it's recommended to call EmbeddedKafka.stop() in a after block or in a similar teardown logic.

Configuration

It's possible to change the ports on which Zookeeper and Kafka are started by providing an implicit EmbeddedKafkaConfig

class MySpec extends AnyWordSpecLike with Matchers with EmbeddedKafka {

  "runs with embedded kafka on a specific port" should {

    "work" in {
      implicit val config = EmbeddedKafkaConfig(kafkaPort = 12345)

      withRunningKafka {
        // now a kafka broker is listening on port 12345
      }
    }
  }
}

If you want to run ZooKeeper and Kafka on arbitrary available ports, you can use the withRunningKafkaOnFoundPort method. This is useful to make tests more reliable, especially when running tests in parallel or on machines where other tests or services may be running with port numbers you can't control.

class MySpec extends AnyWordSpecLike with Matchers with EmbeddedKafka {

  "runs with embedded kafka on arbitrary available ports" should {

    "work" in {
      val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0)

      withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig =>
        // now a kafka broker is listening on actualConfig.kafkaPort
        publishStringMessageToKafka("topic", "message")
        consumeFirstStringMessageFrom("topic") shouldBe "message"
      }
    }
  }
}

The same implicit EmbeddedKafkaConfig is used to define custom consumer or producer properties

class MySpec extends AnyWordSpecLike with Matchers with EmbeddedKafka {

  "runs with custom producer and consumer properties" should {
    "work" in {
      val customBrokerConfig = Map("replica.fetch.max.bytes" -> "2000000",
        "message.max.bytes" -> "2000000")

      val customProducerConfig = Map("max.request.size" -> "2000000")
      val customConsumerConfig = Map("max.partition.fetch.bytes" -> "2000000")

      implicit val customKafkaConfig = EmbeddedKafkaConfig(
        customBrokerProperties = customBrokerConfig,
        customProducerProperties = customProducerConfig,
        customConsumerProperties = customConsumerConfig)

      withRunningKafka {
        // now a kafka broker is listening on port 12345
      }
    }
  }
}

This works for withRunningKafka, withRunningKafkaOnFoundPort, and EmbeddedKafka.start()

Also, it is now possible to provide custom properties to the broker while starting Kafka. EmbeddedKafkaConfig has a customBrokerProperties field which can be used to provide extra properties contained in a Map[String, String]. Those properties will be added to the broker configuration, be careful some properties are set by the library itself and in case of conflict the customBrokerProperties values will take precedence. Please look at the source code to see what these properties are.

Utility methods

The EmbeddedKafka trait provides also some utility methods to interact with the embedded kafka, in order to set preconditions or verifications in your specs:

def publishToKafka(topic: String, message: String): Unit

def consumeFirstMessageFrom(topic: String): String

def createCustomTopic(topic: String, topicConfig: Map[String,String], partitions: Int, replicationFactor: Int): Unit

Custom producers

Given implicits Deserializers for each type and an EmbeddedKafkaConfig it is possible to use withProducer[A, B, R] { your code here } where R is the code return type.

For more information about how to use the utility methods, you can either look at the Scaladocs or at the tests of this project.

Custom consumers

Given implicits Serializers for each type and an EmbeddedKafkaConfig it is possible to use withConsumer[A, B, R] { your code here } where R is the code return type.

Loan methods example

A simple test using loan methods can be as simple as this:

  implicit val serializer: Serializer[String]     = new StringSerializer()
  implicit val deserializer: Deserializer[String] = new StringDeserializer()
  val key                                         = "key"
  val value                                       = "value"
  val topic                                       = "loan_method_example"

  EmbeddedKafka.withProducer[String, String, Unit](producer =>
    producer.send(new ProducerRecord[String, String](topic, key, value)))

  EmbeddedKafka.withConsumer[String, String, Assertion](consumer => {
    consumer.subscribe(Collections.singletonList(topic))

    eventually {
      val records = consumer.poll(java.time.Duration.ofMillis(1.seconds.toMillis)).asScala
      records should have size 1
      records.head.key shouldBe key
      records.head.value shouldBe value
    }
  })

embedded-kafka-streams

A library that builds on top of embedded-kafka to offer easy testing of Kafka Streams.

It takes care of instantiating and starting your streams as well as closing them after running your test-case code.

How to use

  • In your build.sbt file add the following dependency (replace x.x.x with the appropriate version): "io.github.embeddedkafka" %% "embedded-kafka-streams" % "x.x.x" % Test
  • Have a look at the example test
  • For most of the cases have your class extend the EmbeddedKafkaStreams trait. This offers both streams management and easy loaning of producers and consumers for asserting resulting messages in output/sink topics.
  • Use EmbeddedKafkaStreams.runStreams and EmbeddedKafka.withConsumer and EmbeddedKafka.withProducer. This allows you to create your own consumers of custom types as seen in the example test.

embedded-kafka-connect

A library that builds on top of embedded-kafka to offer easy testing of Kafka Connect.

It takes care of instantiating and starting a Kafka Connect server as well as closing it after running your test-case code.

How to use

  • In your build.sbt file add the following dependency (replace x.x.x with the appropriate version): "io.github.embeddedkafka" %% "embedded-kafka-connect" % "x.x.x" % Test
  • Have a look at the example test
  • For most of the cases have your class extend the EmbeddedKafkaConnect trait.
  • Use EmbeddedKafkaConnect.startConnect. This allows you to start a Kafka Connect server to interact with as seen in the example test.

embedded-kafka's People

Contributors

adamosloizou avatar crankydillo avatar dependabot[bot] avatar dichotomia avatar elfolink avatar francescopellegrini avatar galarragas avatar kciesielski avatar l15k4 avatar lissovski avatar lodamar avatar lucapertile avatar lucaqz avatar manub avatar max5599 avatar mdulac avatar mihaisoloi avatar mikemintz avatar mmolimar avatar msaunier-poctu avatar mtranter avatar nequissimus avatar nguyenuy avatar oliverlockwood avatar scala-steward avatar seglo avatar stefanobaghino avatar sullis avatar timgentonzo avatar v-gerasimov avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

embedded-kafka's Issues

ServerCnxnFactory is missing from the classpath

Hello. We tried to upgrade to embeddedkafka 3.2.0 (from 2.7.0), and encountered below error.

"""
Symbol 'type org.apache.zookeeper.server.ServerCnxnFactory' is missing from the classpath.
This symbol is required by 'method io.github.embeddedkafka.ops.ZooKeeperOps.startZooKeeper'.
Make sure that type ServerCnxnFactory is in your classpath and check for conflicting dependencies with -Ylog-classpath.
A full rebuild may help if 'ZooKeeperOps.class' was compiled against an incompatible version of org.apache.zookeeper.server.
"""

We can't seem to figure out what the cause.

Java: 1.8
Scala: 2.13.8
Gradle: 6.7

Version 2.4.1 giving topic "not present in metadata after 60000 ms"

After upgrading to embedded-kafka 2.4.1 (as well as upgrading the kafka client to the same version) we are seeing error such as:

Topic anytopic not present in metadata after 60000 ms

If we keep the kafka client at 2.4.1 and switch embedded-kafka back to 2.4.0 we do not see the error.

We are using withRunningKafka to start embedded kafka, but have tried to explicitly start/stop to no avail. We are using default configuration.

Scala version: 2.12.8
SBT version: 1.3.9
Kafka client version: 2.4.1
embedded kafka version: 2.4.1

Rename package?

I have been using this library since the original was deprecated, and it works great, thank you!

Just wanted to ask if there is any plan to rename the package to match the project domain at all?

Its a little weird right now adding the dependency and having a different import statement for the classes.

"io.github.embeddedkafka" %% "embedded-kafka" % "2.5.0" % "test,it"
import net.manub.embeddedkafka._

Several partitions

Hi!
How to specify from which partition I would like to consume without using EmbeddedKafka.withConsumer[String, String, Assertion] ? Is it possible to do it with consumeNumberStringMessagesFrom?

Thank you in advance!

Evaluate using Kafka TopologyTestDriver

Kafka 2.0 included a new package for testing Topologies: the driver captures the results records and allows to query its embedded state stores.

Would it make sense for us to rely on this library to test Kafka Streams Topologies though it does not integrate with embedded Kafka nor embedded Confluent Schema Registry (natively)?

Build with Scala 2.13

Akka is now published for Scala 2.13.0-M5.
It would be great to get an embedded-kafka version for it.

Deserializer custom types

Hey, If I have this type

sealed trait OperationType extends Product with Serializable
 final case object NewSubscription                   extends OperationType
  final case object DeleteSubscription                extends OperationType
  final case class Organization(organization: String) extends AnyVal

  final case class MessageEvent(
      operationType: OperationType,
      organization: Organization,
      repository: Repository
  )

I am trying to create a custom deserializer for the MessageEvent type just like we have for String.
Is there an Api for this please?

Ref - https://github.com/embeddedkafka/embedded-kafka/blob/master/embedded-kafka/src/test/scala/io/github/embeddedkafka/EmbeddedKafkaMethodsSpec.scala#L126

No reply so closing this issue but I still would love an answer

Deleting a topic fails when running on Windows in Github Actions

Hi,

I'm working on an app that is available for Linux, macOS and Windows so I'm running my tests on the three OS.

In my tests, I create a topic in an embedded-kafka. That works perfectly fine for Linux and macOS but on Windows, I have the following error:

2021-11-11 07:30:59 [data-plane-kafka-request-handler-1] ERROR kafka.server.LogDirFailureChannel - Error while renaming dir for topic-002-0 in log dir C:\Users\RUNNER~1\AppData\Local\Temp\kafka-logs6847069385745433788
java.nio.file.AccessDeniedException: C:\Users\RUNNER~1\AppData\Local\Temp\kafka-logs6847069385745433788\topic-002-0 -> C:\Users\RUNNER~1\AppData\Local\Temp\kafka-logs6847069385745433788\topic-002-0.7cdd8b3b5a8b498a8c39aec442563614-delete
	at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:89)
	at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
	at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395)
	at java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
	at java.base/java.nio.file.Files.move(Files.java:1422)
	at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:940)
	at kafka.log.Log.$anonfun$renameDir$2(Log.scala:1098)
	at kafka.log.Log.renameDir(Log.scala:2482)
	at kafka.log.LogManager.asyncDelete(LogManager.scala:991)
	at kafka.log.LogManager.$anonfun$asyncDelete$3(LogManager.scala:1026)
	at scala.Option.foreach(Option.scala:437)
	at kafka.log.LogManager.$anonfun$asyncDelete$2(LogManager.scala:1024)
	at kafka.log.LogManager.$anonfun$asyncDelete$2$adapted(LogManager.scala:1022)
	at scala.collection.mutable.HashSet$Node.foreach(HashSet.scala:435)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:361)
	at kafka.log.LogManager.asyncDelete(LogManager.scala:1022)
	at kafka.server.ReplicaManager.stopPartitions(ReplicaManager.scala:489)
	at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:427)
	at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:284)
	at kafka.server.KafkaApis.handle(KafkaApis.scala:172)
	at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
	at java.base/java.lang.Thread.run(Thread.java:829)
	Suppressed: java.nio.file.AccessDeniedException: C:\Users\RUNNER~1\AppData\Local\Temp\kafka-logs6847069385745433788\topic-002-0 -> C:\Users\RUNNER~1\AppData\Local\Temp\kafka-logs6847069385745433788\topic-002-0.7cdd8b3b5a8b498a8c39aec442563614-delete
		at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:89)
		at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
		at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:309)
		at java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
		at java.base/java.nio.file.Files.move(Files.java:1422)
		at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:937)
		... 16 common frames omitted

Any idea how to fix this issue? 🤔

Support for kafka transactions?

Hi,
I'm using Alpakka Transactions (https://doc.akka.io/docs/alpakka-kafka/current/transactions.html) to consume from one topic and produce to another one. When I try to test this with the embedded-kafka it seems it is never producing to the target topic and/or committing the consumer offset. Using the adminclient I cannot find back the group-id of the consumer. When I run exactly the same code with a local kafka instance (so not the embedded kafka) it works fine.
Does embedded-kafka support transactions?
Patrice

scala/math/Ordering not found after 2.5.0 update

I am using embedded-kafka 2.4.1.1 and it works like a charm. I have tried to update the dependency to 2.5.0, 2.5.1 or 2.6.0 but I get this stacktrace with the newer versions:

[info]   Cause: java.lang.ClassNotFoundException: scala.math.Ordering$$anon$7
[info]   at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
[info]   at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
[info]   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
[info]   at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
[info]   at kafka.api.ApiVersion$.orderingByVersion(ApiVersion.scala:45)
[info]   at kafka.api.ApiVersion.compare(ApiVersion.scala:141)
[info]   at kafka.api.ApiVersion.compare$(ApiVersion.scala:140)
[info]   at kafka.api.KAFKA_2_6_IV0$.compare(ApiVersion.scala:348)
[info]   at kafka.api.KAFKA_2_6_IV0$.compare(ApiVersion.scala:348)
[info]   at scala.math.Ordered.$greater$eq(Ordered.scala:91)

I am using Scala 2.12.12

Redefine release process

Travis pipeline is configured to publish tagged releases automatically.

However, sbt release will also publish built artifacts.

I think we need to change the behavior by redefining the release process, in order to be able to run sbt release locally just to bump project version:

releaseProcess := Seq[ReleaseStep](
  setReleaseVersion,
  commitReleaseVersion,
  tagRelease,
  setNextVersion,
  commitNextVersion,
  pushChanges
)

Can't run embedded server with kafka-avro-serializer dep

Hello,
I'm trying to write tests for my producer however I'm not having any luck. I am simply calling EmbeddedKafka.start() but this fails immediately with the exception below:

Exception Stack

A needed class was not found. This could be due to an error in your runpath. Missing class: org/apache/kafka/common/metrics/MetricsContext
java.lang.NoClassDefFoundError: org/apache/kafka/common/metrics/MetricsContext
	at net.manub.embeddedkafka.ops.KafkaOps.startKafka(kafkaOps.scala:52)
	at net.manub.embeddedkafka.ops.KafkaOps.startKafka$(kafkaOps.scala:26)
	at net.manub.embeddedkafka.EmbeddedKafka$.startKafka(EmbeddedKafka.scala:52)
	at net.manub.embeddedkafka.ops.RunningKafkaOps.startKafka(kafkaOps.scala:81)
	at net.manub.embeddedkafka.ops.RunningKafkaOps.startKafka$(kafkaOps.scala:74)
	at net.manub.embeddedkafka.EmbeddedKafka$.startKafka(EmbeddedKafka.scala:52)
	at net.manub.embeddedkafka.EmbeddedKafka$.start(EmbeddedKafka.scala:70)
	at whoseturn.test.support.kafka.KafkaSupport.beforeAll(KafkaSupport.scala:10)
	at whoseturn.test.support.kafka.KafkaSupport.beforeAll$(KafkaSupport.scala:9)
	at whoseturn.web.todos.TodoFeedItemProducerSpec.beforeAll(TodoFeedItemProducerSpec.scala:14)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at whoseturn.web.todos.TodoFeedItemProducerSpec.run(TodoFeedItemProducerSpec.scala:14)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1320)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1314)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971)
	at org.scalatest.tools.Runner$.run(Runner.scala:798)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:40)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.metrics.MetricsContext
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 27 more

After some time googling, it seems like a dependancy problem so I removed my dependancies one at a time till I stopped getting this error. It turns out if I have kafka-avro-serializeras a dependancy I will get this error. I don't really understand why and don't know what a possible solution could be. Any suggestions as to what I can do, I would really like to use both libraries if possible. Below is my build.sbt dependancies.

build.sbt

version := "0.1"

scalaVersion := "2.13.3"
resolvers += "Confluent" at "https://packages.confluent.io/maven/"
libraryDependencies ++= List(
      "org.typelevel"              %% "cats-core"            % "2.1.1",
      "com.typesafe.akka"          %% "akka-stream"          % "2.6.8",
      "org.typelevel"              %% "cats-effect"          % "2.1.4",
      "com.typesafe.scala-logging" %% "scala-logging"        % "3.9.2",
      "org.slf4j"                  % "slf4j-api"             % "1.7.5",
      "org.slf4j"                  % "slf4j-simple"          % "1.7.5",
      "org.apache.commons"         % "commons-io"            % "1.3.2",
      "org.apache.avro"            % "avro"                  % "1.9.2",
      "org.apache.kafka"           % "kafka-clients"         % "2.6.0",
      "io.github.embeddedkafka"    %% "embedded-kafka"       % "2.6.0",
      "io.confluent"               % "kafka-avro-serializer" % "5.5.1"
)

Upgrading from 2.7.0 to 2.8.0 - NoSuchMethodError

Since upgrading from 2.7.0 to 2.8.0 and having changed all imports from net.manub... to io.github... we have seen the below when trying to use withRunningKafka. Is there something else we also need to do to migrate from 2.7.0 to 2.8.0?

[error] java.lang.NoSuchMethodError: 'boolean kafka.server.KafkaServer$.$lessinit$greater$default$4()'
[error]         at io.github.embeddedkafka.ops.KafkaOps.startKafka(kafkaOps.scala:52)
[error]         at io.github.embeddedkafka.ops.KafkaOps.startKafka$(kafkaOps.scala:26)

Remove link to original fork

Github doesn't allow searching on repositories which are forked.

image

Can the link be removed so this repository can be searched.

Embedded Kafka Message From Previous Test Exists In New Test

Im using embeddedkafka 2.1.1 with sbt 1.3.0 and scala 2.11.12.
I have a test that should be ignored


\"publish corresponding message to Kafka\" in {

      val uri = new URI(\"http\", \"localhost:9000", "/v1/data/datasets/", """datasetIds=["dataset1","dataset2"]""", null)

      val aRequest = play.api.test.FakeRequest(method = "DELETE", uri = uri.toASCIIString, headers = play.api.test.FakeHeaders(), body = AnyContentAsEmpty)

      when(mockMDSRepo.deleteMetaDatasets(Set("dataset1", "dataset2"))) thenReturn DeleteSuccess

      implicit val config: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort = 9092, zooKeeperPort = 2181)
      runningwithkafka
        val kafkaPublisher = new KafkaPublisherImpl
        val dsController = new MetaDatasetController(mockMDSRepo, mockSchemaRepo, mockJobRepo, mockAppRepo, kafkaPublisher) {
          override def controllerComponents: ControllerComponents = Helpers.stubControllerComponents()
        }
        val results = dsController.deleteDatasets().apply(aRequest)
        status(results) mustBe 204
        contentType(results) mustBe None
        consumeFirstStringMessageFrom(bpa.kafkaTopic) mustBe KafkaMessages.datasetDeletedMessage1
        consumeFirstStringMessageFrom(bpa.kafkaTopic) mustBe KafkaMessages.datasetDeletedMessage2
      EmbeddedKafka.stop()
    }

How ever in a later test

 "publish proper dataset created message" in {
      when(mockAppRepo.getAllBPAApplications) thenReturn Set(bpa)
      implicit val config: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort = 9092, zooKeeperPort = 2181)
      withRunningKafka {
        val kafkaPublisher = new KafkaPublisherImpl
        val kafkaUtil = new KafkaUtil(kafkaPublisher, mockAppRepo)
        kafkaUtil.publisDatasetCreatedMessage(testMetadataset)
        consumeFirstStringMessageFrom(bpa.kafkaTopic) mustBe KafkaMessages.datasetCreatedMessage1
      }
    }

the test fails as the consumeFirstStringMessageFrom(bpa.kafkaTopic) results in the KafkaMessages.datasetDeletedMessage1 (from ignored test) instead of the testMetadataset that was supposed to be put into kafka

I tried the test classes with BeforeAndAfterEach and BeforeEach starting Embedded kafka and AfterEach stopping Embedded kafka, but i would still get the test failing.

java.lang.NoClassDefFoundError: org/apache/kafka/server/authorizer/Authorizer when doing EmbeddedKafka.start()

When I am trying to write unit test for Spark streaming scala with kafka, I am getting following error, Can you please suggest right combination of dependencies or how do we resolve.

A needed class was not found. This could be due to an error in your runpath. Missing class: org/apache/kafka/server/authorizer/Authorizer
java.lang.NoClassDefFoundError: org/apache/kafka/server/authorizer/Authorizer
at kafka.server.KafkaConfig$.(KafkaConfig.scala:520)
at kafka.server.KafkaConfig$.(KafkaConfig.scala)
at net.manub.embeddedkafka.ops.KafkaOps$class.startKafka(kafkaOps.scala:36)

Kafka dependencies in pom.xml:

    <dependency>
        <groupId>com.github.benfradet</groupId>
        <artifactId>spark-kafka-0-10-writer_2.11</artifactId>
        <version>0.2.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.4.4</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/io.github.embeddedkafka/embedded-kafka-streams -->
    <dependency>
        <groupId>io.github.embeddedkafka</groupId>
        <artifactId>embedded-kafka-streams_2.11</artifactId>
        <version>2.4.1.1</version>
    </dependency>

Thank You,
Srinivas

Support for SSL

Hi, thanks for the great job, the library is really useful.
One question, do you know if the library has support to run the Kafka embedded in SSL?

Regards

Zookeeper dependency issue for version 2.2.1

Issue:
I'm getting this exception java.lang.classnotfoundexception: org.apache.zookeeper.asynccallback$multicallback while running:

        <dependency>
            <groupId>io.github.embeddedkafka</groupId>
            <artifactId>embedded-kafka_2.11</artifactId>
            <version>2.2.1</version>
            <scope>test</scope>
        </dependency>

Which is actually because the api org.apache.zookeeper.asynccallback has been added since zookeeper 3.4.7, but the above version brings zookeeper 3.4.6, so that causes java.lang.classnotfoundexception.

Workaround:
Excludes zookeeper dependencies in embedded-kafka and in addition add the right version:

        <dependency>
            <groupId>io.github.embeddedkafka</groupId>
            <artifactId>embedded-kafka_2.11</artifactId>
            <version>2.2.1</version>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.7</version>
            <scope>test</scope>
        </dependency>

Topologies with groupBy and aggregate don't work.

Hi, I was trying to test topologies that do groupBy and aggregation in between which creates internal topics in kafka.

here is a test to reproduce the issue

package net.manub.embeddedkafka.schemaregistry.streams

import java.lang
import java.time.Instant

import net.manub.embeddedkafka.Codecs._
import net.manub.embeddedkafka.ConsumerExtensions._
import net.manub.embeddedkafka.TestAvroClass
import net.manub.embeddedkafka.schemaregistry.avro.Codecs._
import net.manub.embeddedkafka.schemaregistry._
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.{Consumed, KStream, Produced}
import org.scalatest.{Matchers, WordSpec}

class ExampleKafkaStreamsSpec
    extends WordSpec
    with Matchers
    with EmbeddedKafkaStreamsAllInOne {

  implicit val config: EmbeddedKafkaConfig =
    EmbeddedKafkaConfig(kafkaPort = 7000,
                        zooKeeperPort = 7001,
                        schemaRegistryPort = 7002)

  val (inTopic, outTopic) = ("in", "out")

  val stringSerde: Serde[String] = Serdes.String()
  val longSerde: Serde[lang.Long] = Serdes.Long()
  implicit val longDeserializer = Serdes.Long().deserializer()
  implicit val decoder
    : ConsumerRecord[String, lang.Long] => (String, lang.Long) = cr =>
    (cr.key(), cr.value())
  val avroSerde: Serde[TestAvroClass] = serdeFrom[TestAvroClass]

  "A Kafka streams test using Schema Registry" should {
    "be easy to run with streams and consumer lifecycle management" in {
      val streamBuilder = new StreamsBuilder
      val stream: KStream[String, TestAvroClass] =
        streamBuilder.stream(inTopic, Consumed.`with`(stringSerde, avroSerde))
      stream
        .peek((k, v) => println(s"${Instant.now} Before Grouping $k=$v"))
        .groupByKey()
        .count
        .toStream
        .peek((k, v) => println(s"${Instant.now} $k=$v"))
        .to(outTopic, Produced.`with`(stringSerde, longSerde))

      runStreams(Seq(inTopic, outTopic), streamBuilder.build()) {
        publishToKafka(inTopic, "hello", TestAvroClass("world"))
        publishToKafka(inTopic, "foo", TestAvroClass("bar"))
        withConsumer[String, lang.Long, Unit] {
          consumer: KafkaConsumer[String, lang.Long] =>
            val consumedMessages: Stream[(String, lang.Long)] =
              consumer.consumeLazily[(String, lang.Long)](outTopic).take(2).toList

            println(s"${Instant.now} after consuming")

            consumedMessages should be(Seq("hello" -> 1, "foo" -> 1))
        }
      }
    }
  }
}

I have added printlns to show how the events come out of sync

2019-03-22T20:58:09.599Z Before Grouping hello={"name": "world"}
2019-03-22T20:58:09.602Z Before Grouping foo={"name": "bar"}
2019-03-22T20:58:12.769Z after consuming
2019-03-22T20:58:12.825Z After Grouping hello=1
2019-03-22T20:58:12.828Z After Grouping foo=1

Am I doing something wrong?

Why is zookeeper.connection.timeout.ms set?

Hi,

EmbeddedKafka sets the zookeeper.connection.timeout.ms to 10 seconds. I was wondering what the point is of setting this config explicitly? According to the Kafka documentation, the connection timeout will follow the zookeeper.session.timeout.ms (default 18s) if you don't specify the connection timeout. It seems a little weird to only want to wait 10 seconds for a connection, but to be willing to wait 18 seconds between heartbeats before timing out the ZK session.

Would it make sense to not set zookeeper.connection.timeout.ms in EmbeddedKafka by default? That way, the two properties will have the same value by default.

Upgrade log4j dependency to supported version

At the moment, the slf4j-log4j12 binding for log4j 1.2 is being used for logging, which pulls in log4j 1.2.17 as a dependency.

"org.slf4j" % "slf4j-log4j12" % Versions.Slf4j,

Log4j 1.2 has been end-of-life since 2015, and suffers from several major vulnerabilities (for example https://www.cvedetails.com/cve/CVE-2022-23305/ and https://www.cvedetails.com/cve/CVE-2021-4104/).

Many software organizations have security policies which prohibit using libraries which depend on software with such high-visibility security issues (even in test), and as such are unable to use your library, even though they'd really like to.

Replacing the slf4j-log4j12 library with a binding for log4j 2.x (for example https://logging.apache.org/log4j/2.x/log4j-slf4j-impl/), or replacing use of log4j with another logging framework such as logback or JCL would be a good step for the security of this library.

Build for Kafka v3.0.0 client

When will a build for Kafka v3.0.0 client be released? It looks there is a PR already for this: #309.

Our team needs to use the new Kafka client features, but we cannot update the dependency yet, because there is no embedded-kafka compatible version yet.

Please provide a way to create multiple brokers

In the previous version 2.4.0 there was a way of creating multiple brokers by doing explicit calls to startZookeeper(port, dir) and startKafka(conf, dir). Now it seems to be well hidden. Could you bring this possibility back somehow?
Thanks!

Timeout when wrapping an it clause in a withRunningKafka but not the other way around

Hi,

Trying a simple test like:

    withRunningKafka {
      it("should pass") {
        publishStringMessageToKafka("topic", "message")
        consumeFirstStringMessageFrom("topic") shouldBe "message"
      }
    }

results in 10 seconds of

21:09:47.512 [kafka-producer-network-thread | producer-1] WARN  o.a.k.c.NetworkClient - [Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:6001) could not be established. Broker may not be available.
21:09:47.512 [kafka-producer-network-thread | producer-1] WARN  o.a.k.c.NetworkClient - [Producer clientId=producer-1] Bootstrap broker localhost:6001 (id: -1 rack: null) disconnected

Followed by the test failing

net.manub.embeddedkafka.KafkaUnavailableException was thrown.
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic topic not present in metadata after 10000 ms.
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic topic not present in metadata after 10000 ms.

Unexpected exception in schema registry group processing thread org.apache.kafka.common.errors.WakeupException: null

Hey I am seeing this error using embedded-kaka


[warn] o.a.k.c.n.Selector - [SocketServer brokerId=0] Unexpected error from /127.0.0.1; closing connection
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 369296129 larger than 104857600)
	at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:105)
	at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:447)
	at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:397)
  | => rat org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
	at kafka.network.Processor.poll(SocketServer.scala:913)
	at kafka.network.Processor.run(SocketServer.scala:816)
	at java.lang.Thread.run(Thread.java:748)
[warn] o.a.k.c.NetworkClient - [Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:6001) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.
[warn] o.a.k.c.NetworkClient - [Producer clientId=producer-1] Bootstrap broker localhost:6001 (id: -1 rack: null) disconnected
[error] i.c.k.s.l.k.KafkaGroupLeaderElector - Unexpected exception in schema registry group processing thread
org.apache.kafka.common.errors.WakeupException: null
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:514)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:278)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
  | => rat org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227)
	at io.confluent.kafka.schemaregistry.leaderelector.kafka.SchemaRegistryCoordinator.poll(SchemaRegistryCoordinator.java:125)
	at io.confluent.kafka.schemaregistry.leaderelector.kafka.KafkaGroupLeaderElector$1.run(KafkaGroupLeaderElector.java:200)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

Any reason why this is the case?

Ask for help testing a topology with multiple consumers on multiple output topics.

Hello,
This is more a request for help than reporting an issue. I have a strange unpredictable behavior I struggle to understand.
I have a topology with several input and output topics. In unit tests, everything's fine.
My integration tests, using EmbeddedKafka, looks roughly like this:

withRunningKafka {

  createCustomTopic(topic = "in1", partitions = numParts)
  createCustomTopic(topic = "in2", partitions = numParts)

  val streams = new KafkaStreams(topology,  ...config... )
  
  publishToKafka("in1", List(...))
  publishToKafka("in2", List(...))

  streams.start()      // yes, I want to start the stream after feeding the topics (as it may happen IRL), as I want to check the proper management of timestamps among others...
   
  withConsumer[...] { 
      consumer =>
        val events = consumer.consumeLazily[...]("out1").take(10).toList
        events.size shouldBe 6
  }

  withConsumer[...] { 
      consumer =>
        val events = consumer.consumeLazily[...]("out2").take(10).toList
        events.size shouldBe 6
  }
 
  withConsumer[...] { 
      consumer =>
        val events = consumer.consumeLazily[...]("out3").take(30).toList
        events.size shouldBe 24
  }

  streams.close() // actually in a finally block
}

Now, I don't have consistent results : if I swap the 2 last consumer blocks, some tests are failing (0 events fetched), or not. The first block intermittently fails.
Is there anything I'm doing wrong in this setup, or with the consumers ?

Also, depending on the number of partitions numParts on inputs, I don't have the same results. You would say this is a problem in my topology, but so far I didn't find any reason for this.

I'd be glad if you have any hints or suggestion on these. Thanks

Cross compile with Scala2.13

Hello!

Given that the library is currently only being published for scala 2.12, I wanted to know if there where any intentions to cross compile it with scala 2.13?

Kafka server shutting down too fast

I'm trying to do a very simple bootstrap app with Kafka and spark streaming:
Topic1 / Topic2

a Spark streaming application that read from Topic1 and write to Topic2 without doing any transformation.

scalaVersion := "2.12.8"
val sparkVersion = "2.4.2"
...
libraryDependencies += "io.github.embeddedkafka" %% "embedded-kafka" % "2.2.0" % "test"

Read stream:

object ReadKafkaTopic {
  def readStream(spark: SparkSession, brokers: String, topic: String): DataFrame = {
    spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokers)
      .option("subscribe", topic)
      .option("startingOffsets", "latest")
      .load()
  }
}

WriteStream:

object WriteKafkaTopic {
  def writeStream(df: DataFrame, brokers: String, topic: String): Unit = {
    df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokers)
      .option("topic", topic)
      .option("checkpointLocation", "/tmp/checkpoint")
      .start()
  }
}

I wish to test this simple application:

class MainIntegrationTests extends WordSpec with EmbeddedKafka {

  "runs with embedded kafka" should {
    val spark = SparkSession.builder.master("local[*]").getOrCreate()
    val topicIn: String = "in"
    val topicOut: String = "out"

    "work" in {
      implicit val config = EmbeddedKafkaConfig(kafkaPort = 9092)
      withRunningKafka {
        println("SPARK:")
        println(spark)

        println("Publishing to topic IN")
        publishStringMessageToKafka(topicIn, "message")

        println("READING FROM TOPIC")
        val df: DataFrame = ReadKafkaTopic.readStream(spark, "127.0.0.1:9092", topicIn)

        println("WRITING TO TOPIC")
        WriteKafkaTopic.writeStream(df, "127.0.0.1:9092", topicOut)

        println("FINALE READING!")

        println("RESULT:")
        println(EmbeddedKafka.isRunning)

        val resultTopic = consumeFirstStringMessageFrom(topicOut)
        println("Result TOPIC: " + resultTopic)
        assert(resultTopic == "message")

      }
    }

    spark.stop()
  }
}

But when I reach the println("READING FROM TOPIC") I see this in the logs:

READING FROM TOPIC
19/05/13 12:13:08 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/home/jay/mporium/bootstrap/spark-kafka-to-kafka-utils/spark-warehouse').
19/05/13 12:13:08 INFO SharedState: Warehouse path is 'file:/home/jay/mporium/bootstrap/spark-kafka-to-kafka-utils/spark-warehouse'.
19/05/13 12:13:08 INFO KafkaServer: [KafkaServer id=0] shutting down
19/05/13 12:13:08 INFO KafkaServer: [KafkaServer id=0] Starting controlled shutdown
19/05/13 12:13:08 INFO KafkaController: [Controller id=0] Shutting down broker 0
19/05/13 12:13:08 INFO KafkaServer: [KafkaServer id=0] Controlled shutdown succeeded
19/05/13 12:13:08 INFO ZkNodeChangeNotificationListener$ChangeEventProcessThread: [/config/changes-event-process-thread]: Shutting down
19/05/13 12:13:08 INFO ZkNodeChangeNotificationListener$ChangeEventProcessThread: [/config/changes-event-process-thread]: Stopped
19/05/13 12:13:08 INFO ZkNodeChangeNotificationListener$ChangeEventProcessThread: [/config/changes-event-process-thread]: Shutdown completed
19/05/13 12:13:08 INFO SocketServer: [SocketServer brokerId=0] Stopping socket server request processors
19/05/13 12:13:08 INFO SocketServer: [SocketServer brokerId=0] Stopped socket server request processors
19/05/13 12:13:08 INFO KafkaRequestHandlerPool: [data-plane Kafka Request Handler on Broker 0], shutting down
19/05/13 12:13:08 INFO KafkaRequestHandlerPool: [data-plane Kafka Request Handler on Broker 0], shut down completely
19/05/13 12:13:08 INFO KafkaApis: [KafkaApi-0] Shutdown complete.
19/05/13 12:13:08 INFO DelayedOperationPurgatory$ExpiredOperationReaper: [ExpirationReaper-0-topic]: Shutting down
19/05/13 12:13:08 INFO DelayedOperationPurgatory$ExpiredOperationReaper: [ExpirationReaper-0-topic]: Stopped
19/05/13 12:13:08 INFO DelayedOperationPurgatory$ExpiredOperationReaper: [ExpirationReaper-0-topic]: Shutdown completed
19/05/13 12:13:08 INFO TransactionCoordinator: [TransactionCoordinator id=0] Shutting down.

And of course all this generate an error on Spark
java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.internal.SessionStateBuilder'

Do you know why the kafka server is closing this soon?
Or am I doing it completely wrong?

Many thanks

Embedded kafka not starting after upgrading spring-boot-starter version to 2.3.12

When migrating spring-boot-starter version from 2.1.10.RELEASE to 2.3.12.RELEASE, we are facing below issue (stacktrace). Below is the pom.xml snippet

XXXXXXXXXXXX:tokenization-streaming-service XXXXX$ mvn dependency:tree | grep kafka
[INFO] +- org.springframework.kafka:spring-kafka:jar:2.5.14.RELEASE:compile
[INFO] | - org.apache.kafka:kafka-clients:jar:2.5.1:compile
[INFO] | +- org.apache.kafka:kafka_2.11:jar:1.0.1:compile
[INFO] +- io.confluent:kafka-avro-serializer:jar:4.0.1:compile
[INFO] | +- io.confluent:kafka-schema-registry-client:jar:4.0.1:compile
[INFO] +- org.springframework.kafka:spring-kafka-test:jar:2.5.14.RELEASE:test
[INFO] | +- org.apache.kafka:kafka-clients:jar:test:2.5.1:test
[INFO] | +- org.apache.kafka:kafka-streams:jar:2.5.1:test
[INFO] | | +- org.apache.kafka:connect-json:jar:2.5.1:test
[INFO] | | | - org.apache.kafka:connect-api:jar:2.5.1:test
[INFO] | +- org.apache.kafka:kafka-streams-test-utils:jar:2.5.1:test
[INFO] | +- org.apache.kafka:kafka_2.12:jar:2.5.1:test
[INFO] | +- org.apache.kafka:kafka_2.12:jar:test:2.5.1:test

Stacktrace

16:46:05.373 [main] DEBUG org.springframework.test.annotation.ProfileValueUtils - Retrieved ProfileValueSource type [class org.springframework.test.annotation.SystemProfileValueSource] for class [router.integration.testing.EventStreamIT]
16:46:05.376 [main] DEBUG org.springframework.test.annotation.ProfileValueUtils - Retrieved @ProfileValueSourceConfiguration [null] for test class [router.integration.testing.EventStreamIT]
16:46:05.376 [main] DEBUG org.springframework.test.annotation.ProfileValueUtils - Retrieved ProfileValueSource type [class org.springframework.test.annotation.SystemProfileValueSource] for class [router.integration.testing.EventStreamIT]
16:46:05.406 [main] DEBUG org.springframework.test.annotation.ProfileValueUtils - Retrieved @ProfileValueSourceConfiguration [null] for test class [router.integration.testing.EventStreamIT]
16:46:05.407 [main] DEBUG org.springframework.test.annotation.ProfileValueUtils - Retrieved ProfileValueSource type [class org.springframework.test.annotation.SystemProfileValueSource] for class [router.integration.testing.EventStreamIT]
[ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.575 s <<< FAILURE! - in router.integration.testing.EventStreamIT
[ERROR] router.integration.testing.EventStreamIT Time elapsed: 0.575 s <<< ERROR!
java.lang.NoSuchMethodError: kafka.utils.Logging.$init$(Lkafka/utils/Logging;)V

[INFO]
[INFO] Results:
[INFO]
[ERROR] Errors:
[ERROR] EventStreamIT » NoSuchMethod kafka.utils.Logging.$init$(Lkafka/utils/Logging;)...
[INFO]
[ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0

ZooKeeperClientTimeoutException when starting the server

I am using version 2.4.1.1.

Every once in a while, when calling EmbeddedKafka.start() during unit tests, I receive the following error kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING.

Misalignment of name for the Schema Registry module

In 162c5e1 @NeQuissimus updated the README.md setting embeddedkafka-confluent as new name for the Schema Registry module, though the value in build.sbt was set to embeddedkafka-schema-registry.

Which one do you think would be the most correct? I'm afraid of consequences if we decide to mention Confluent...

I'd like to discuss this along with #3.

My suggestions:

  • "io.github.embeddedkafka" %% "embedded-kafka"
  • "io.github.embeddedkafka" %% "embedded-kafka-streams"
  • "io.github.embeddedkafka" %% "embedded-kafka-schema-registry"

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.