Comments (4)
Hi @tharunrajguptayada, could you please provide a snippet of your code?
from embedded-kafka-schema-registry.
yeah sure , i am attaching my code below
`class AvroMockTesting extends AnyWordSpecLike with Matchers with BeforeAndAfterAll {
val schema: Schema = new Schema.Parser().parse("{\n "type": "record",\n // "namespace": "com.example",\n "name": "Customer",\n "version": "1",\n "fields": [\n { "name": "first_name", "type": "string", "doc": "First Name of Customer" },\n { "name": "automated_email", "type": "boolean", "default": true, "doc": "Field indicating if the user is enrolled in marketing emails" }\n ]\n}")
implicit val config:EmbeddedKafkaConfig = EmbeddedKafkaConfig(
kafkaPort = 9092, schemaRegistryPort = 8081)
override def beforeAll(): Unit = {
super.beforeAll()
EmbeddedKafka.start()
}
override def afterAll(): Unit = {
EmbeddedKafka.stop()
super.afterAll()
}
"runs with embedded kafka and Schema Registry" should {
implicit val serializer: Serializer[GenericRecord] = {
val props = Map(
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> "http://localhost:8081"
)
val ser = new KafkaAvroSerializer
ser.configure(props.asJava, false)
ser.asInstanceOf[Serializer[GenericRecord]]
}
implicit val deSerializer: Deserializer[GenericRecord] = {
val props = Map(
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> "http://localhost:8081"
)
val ser = new KafkaAvroDeserializer()
ser.configure(props.asJava, false)
ser.asInstanceOf[Deserializer[GenericRecord]]
}
val topic ="input_topic"
"work" in {
val record: GenericRecord = new GenericData.Record(schema)
record.put("first_name", "yada")
record.put("automated_email", false)
EmbeddedKafka.withProducer[String,GenericRecord,Unit]( producer => {
producer.send(new ProducerRecord[String,GenericRecord](topic,"abc",record))
producer.flush()
})
val listOfTopics = Collections.singletonList(topic)
EmbeddedKafka.withConsumer[String,GenericRecord,Unit](consumer => {
consumer.subscribe(listOfTopics)
var i = 10
while(i>=0)
{
val records = consumer.poll(Duration.ofMillis(1000L))
records.forEach(record => println("consumed message:" + record.key() +", " + record.value()))
i-=1
}
})
}
}
}`
from embedded-kafka-schema-registry.
I just tried running your code and it works for me... here it is, reformatted and with the imports I used.
import io.confluent.kafka.serializers.{
AbstractKafkaSchemaSerDeConfig,
KafkaAvroDeserializer,
KafkaAvroSerializer
}
import io.github.embeddedkafka.Codecs._
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{Deserializer, Serializer}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import java.time.Duration
import java.util.Collections
import scala.jdk.CollectionConverters._
class AvroMockTesting extends AnyWordSpecLike with Matchers with BeforeAndAfterAll {
val schema: Schema = new Schema.Parser().parse(
"""{ "type": "record", "namespace": "com.example", "name": "Customer", "version": "1", "fields": [ { "name": "first_name", "type": "string", "doc": "First Name of Customer" }, { "name": "automated_email", "type": "boolean", "default": true, "doc": "Field indicating if the user is enrolled in marketing emails" } ]}"""
)
implicit val config: EmbeddedKafkaConfig =
EmbeddedKafkaConfig(kafkaPort = 9092, schemaRegistryPort = 8081)
override def beforeAll(): Unit = {
super.beforeAll()
EmbeddedKafka.start()
}
override def afterAll(): Unit = {
EmbeddedKafka.stop()
super.afterAll()
}
"runs with embedded kafka and Schema Registry" should {
implicit val serializer: Serializer[GenericRecord] = {
val props = Map(
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> "http://localhost:8081"
)
val ser = new KafkaAvroSerializer
ser.configure(props.asJava, false)
ser.asInstanceOf[Serializer[GenericRecord]]
}
implicit val deSerializer: Deserializer[GenericRecord] = {
val props = Map(
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> "http://localhost:8081"
)
val ser = new KafkaAvroDeserializer()
ser.configure(props.asJava, false)
ser.asInstanceOf[Deserializer[GenericRecord]]
}
val topic = "input_topic"
"work" in {
val record: GenericRecord = new GenericData.Record(schema)
record.put("first_name", "yada")
record.put("automated_email", false)
EmbeddedKafka.withProducer[String, GenericRecord, Unit](producer => {
producer.send(
new ProducerRecord[String, GenericRecord](topic, "abc", record)
)
producer.flush()
})
val listOfTopics = Collections.singletonList(topic)
EmbeddedKafka.withConsumer[String, GenericRecord, Unit](consumer => {
consumer.subscribe(listOfTopics)
var i = 10
while (i >= 0) {
val records = consumer.poll(Duration.ofMillis(1000L))
records.forEach(record =>
println("consumed message:" + record.key() + ", " + record.value())
)
i -= 1
}
})
}
}
}
from embedded-kafka-schema-registry.
import io.github.embeddedkafka.schemaregistry.EmbeddedKafkaConfig
import io.github.embeddedkafka.schemaregistry.EmbeddedKafka
I have tried with these imports , but still getting the same error, may be i will try with another windows machine and check.
Thanks for your response sir
from embedded-kafka-schema-registry.
Related Issues (20)
- Move Kafka Streams to its own project HOT 2
- Expose custom properties for schema registry HOT 4
- Schema registry aborts after startup HOT 7
- Isit possible to use with Java? HOT 1
- how to access the schema registry server in test? HOT 1
- Publish 6.1.1 HOT 1
- Please publish version 6.2.0 HOT 1
- Schema Registry startup error HOT 3
- Can't Connect to schema-registry. HOT 2
- Embedded Kafka Problems. HOT 7
- Custom Schema Registry Port not working HOT 2
- Non-breaking Wakeupexception on stop
- Incompatibility between versions of Confluent Platform and Kafka HOT 1
- Enable Basic and Bearer REST authentication HOT 1
- I am not able to run different specs using the same embedded Kafka HOT 6
- Release 7.3.0 version? HOT 1
- New version HOT 3
- Create 7.5.2 release HOT 2
- Version 7.6.1 uses scala 3.4.1 HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from embedded-kafka-schema-registry.