Giter Site home page Giter Site logo

Comments (4)

francescopellegrini avatar francescopellegrini commented on July 19, 2024

Hi @tharunrajguptayada, could you please provide a snippet of your code?

from embedded-kafka-schema-registry.

tharunrajguptayada avatar tharunrajguptayada commented on July 19, 2024

yeah sure , i am attaching my code below
`class AvroMockTesting extends AnyWordSpecLike with Matchers with BeforeAndAfterAll {

val schema: Schema = new Schema.Parser().parse("{\n "type": "record",\n // "namespace": "com.example",\n "name": "Customer",\n "version": "1",\n "fields": [\n { "name": "first_name", "type": "string", "doc": "First Name of Customer" },\n { "name": "automated_email", "type": "boolean", "default": true, "doc": "Field indicating if the user is enrolled in marketing emails" }\n ]\n}")

implicit val config:EmbeddedKafkaConfig = EmbeddedKafkaConfig(
kafkaPort = 9092, schemaRegistryPort = 8081)

override def beforeAll(): Unit = {
super.beforeAll()
EmbeddedKafka.start()
}

override def afterAll(): Unit = {
EmbeddedKafka.stop()
super.afterAll()
}

"runs with embedded kafka and Schema Registry" should {

implicit val serializer: Serializer[GenericRecord] = {
  val props = Map(
    AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> "http://localhost:8081"
  )
  val ser = new KafkaAvroSerializer
  ser.configure(props.asJava, false)
  ser.asInstanceOf[Serializer[GenericRecord]]
}

implicit val deSerializer: Deserializer[GenericRecord] = {
  val props = Map(
    AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> "http://localhost:8081"
  )
  val ser = new KafkaAvroDeserializer()
  ser.configure(props.asJava, false)
  ser.asInstanceOf[Deserializer[GenericRecord]]
}
val topic ="input_topic"


"work" in {
  val record: GenericRecord = new GenericData.Record(schema)
  record.put("first_name", "yada")
  record.put("automated_email", false)
  EmbeddedKafka.withProducer[String,GenericRecord,Unit]( producer => {
    producer.send(new ProducerRecord[String,GenericRecord](topic,"abc",record))
    producer.flush()
  })

  val listOfTopics = Collections.singletonList(topic)
  EmbeddedKafka.withConsumer[String,GenericRecord,Unit](consumer => {
    consumer.subscribe(listOfTopics)
    var i = 10
    while(i>=0)
      {
        val records = consumer.poll(Duration.ofMillis(1000L))
        records.forEach(record => println("consumed message:" + record.key() +", " + record.value()))
        i-=1
      }
  })

}

}

}`

from embedded-kafka-schema-registry.

francescopellegrini avatar francescopellegrini commented on July 19, 2024

I just tried running your code and it works for me... here it is, reformatted and with the imports I used.

import io.confluent.kafka.serializers.{
  AbstractKafkaSchemaSerDeConfig,
  KafkaAvroDeserializer,
  KafkaAvroSerializer
}
import io.github.embeddedkafka.Codecs._
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{Deserializer, Serializer}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

import java.time.Duration
import java.util.Collections
import scala.jdk.CollectionConverters._

class AvroMockTesting extends AnyWordSpecLike with Matchers with BeforeAndAfterAll {

  val schema: Schema = new Schema.Parser().parse(
    """{ "type": "record", "namespace": "com.example", "name": "Customer", "version": "1", "fields": [ { "name": "first_name", "type": "string", "doc": "First Name of Customer" }, { "name": "automated_email", "type": "boolean", "default": true, "doc": "Field indicating if the user is enrolled in marketing emails" } ]}"""
  )

  implicit val config: EmbeddedKafkaConfig =
    EmbeddedKafkaConfig(kafkaPort = 9092, schemaRegistryPort = 8081)

  override def beforeAll(): Unit = {
    super.beforeAll()
    EmbeddedKafka.start()
  }

  override def afterAll(): Unit = {
    EmbeddedKafka.stop()
    super.afterAll()
  }

  "runs with embedded kafka and Schema Registry" should {

    implicit val serializer: Serializer[GenericRecord] = {
      val props = Map(
        AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> "http://localhost:8081"
      )
      val ser = new KafkaAvroSerializer
      ser.configure(props.asJava, false)
      ser.asInstanceOf[Serializer[GenericRecord]]
    }

    implicit val deSerializer: Deserializer[GenericRecord] = {
      val props = Map(
        AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> "http://localhost:8081"
      )
      val ser = new KafkaAvroDeserializer()
      ser.configure(props.asJava, false)
      ser.asInstanceOf[Deserializer[GenericRecord]]
    }
    val topic = "input_topic"

    "work" in {
      val record: GenericRecord = new GenericData.Record(schema)
      record.put("first_name", "yada")
      record.put("automated_email", false)
      EmbeddedKafka.withProducer[String, GenericRecord, Unit](producer => {
        producer.send(
          new ProducerRecord[String, GenericRecord](topic, "abc", record)
        )
        producer.flush()
      })

      val listOfTopics = Collections.singletonList(topic)
      EmbeddedKafka.withConsumer[String, GenericRecord, Unit](consumer => {
        consumer.subscribe(listOfTopics)
        var i = 10
        while (i >= 0) {
          val records = consumer.poll(Duration.ofMillis(1000L))
          records.forEach(record =>
            println("consumed message:" + record.key() + ", " + record.value())
          )
          i -= 1
        }
      })

    }
  }

}

from embedded-kafka-schema-registry.

tharunrajguptayada avatar tharunrajguptayada commented on July 19, 2024

import io.github.embeddedkafka.schemaregistry.EmbeddedKafkaConfig
import io.github.embeddedkafka.schemaregistry.EmbeddedKafka

I have tried with these imports , but still getting the same error, may be i will try with another windows machine and check.

Thanks for your response sir

from embedded-kafka-schema-registry.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.