Giter Site home page Giter Site logo

ovotech / kafka-serialization Goto Github PK

View Code? Open in Web Editor NEW
120.0 42.0 18.0 361 KB

Lego bricks to build Apache Kafka serializers and deserializers

License: Apache License 2.0

Scala 99.14% Shell 0.86%
apache-kafka serialization json circe avro4s avro json4s spray-json company-kaluza

kafka-serialization's Introduction

Kafka serialization/deserialization building blocks

CircleCI Badge Codacy Badge Download

The aim of this library is to provide the Lego™ bricks to build a serializer/deserializer for kafka messages.

The serializers/deserializers built by this library cannot be used in the Kafka configuration through properties, but need to be passed through the Kafka Producer/Consumer constructors (It is feature IMHO).

For the Avro serialization this library uses Avro4s while for JSON it supports Json4s, Circe and Spray out of the box. It is quite easy to add support for other libraries as well.

Modules

The library is composed by these modules:

  • kafka-serialization-core: provides the serialization primitives to build serializers and deserializers.
  • kafka-serialization-cats: provides cats typeclasses instances for serializers and deserializers.
  • kafka-serialization-json4s: provides serializer and deserializer based on Json4s
  • kafka-serialization-jsoniter-scala: provides serializer and deserializer based on Jsoniter Scala
  • kafka-serialization-spray: provides serializer and deserializer based on Spray Json
  • kafka-serialization-circe: provides serializer and deserializer based on Circe
  • kafka-serialization-avro: provides an schema-registry client settings
  • kafka-serialization-avro4s: provides serializer and deserializer based on Avro4s 1.x
  • kafka-serialization-avro4s2: provides serializer and deserializer based on Avro4s 2.x

The Avro4s serialization support the schema evolution through the schema registry. The consumer can provide its own schema and Avro will take care of the conversion.

Getting Started

  • The library is available in the Kaluza artifactory repository.
  • See here for the latest version.
  • Add this snippet to your build.sbt to use it:
import sbt._
import sbt.Keys.

resolvers += "Artifactory" at "https://kaluza.jfrog.io/artifactory/maven"

libraryDependencies ++= {
  val kafkaSerializationV = "0.5.25"
  Seq(
    "com.ovoenergy" %% "kafka-serialization-core" % kafkaSerializationV,
    "com.ovoenergy" %% "kafka-serialization-circe" % kafkaSerializationV, // To provide Circe JSON support
    "com.ovoenergy" %% "kafka-serialization-json4s" % kafkaSerializationV, // To provide Json4s JSON support
    "com.ovoenergy" %% "kafka-serialization-jsoniter-scala" % kafkaSerializationV, // To provide Jsoniter Scala JSON support
    "com.ovoenergy" %% "kafka-serialization-spray" % kafkaSerializationV, // To provide Spray-json JSON support
    "com.ovoenergy" %% "kafka-serialization-avro4s" % kafkaSerializationV // To provide Avro4s Avro support
  )
}

Circe example

Circe is a JSON library for Scala that provides support for generic programming trough Shapeless. You can find more information on the Circe website.

Simple serialization/deserialization example with Circe:

import com.ovoenergy.kafka.serialization.core._
import com.ovoenergy.kafka.serialization.circe._

// Import the Circe generic support
import io.circe.generic.auto._
import io.circe.syntax._

import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.CommonClientConfigs._

import scala.collection.JavaConverters._

case class UserCreated(id: String, name: String, age: Int)

val producer = new KafkaProducer(
  Map[String, AnyRef](BOOTSTRAP_SERVERS_CONFIG->"localhost:9092").asJava, 
  nullSerializer[Unit], 
  circeJsonSerializer[UserCreated]
)

val consumer = new KafkaConsumer(
  Map[String, AnyRef](BOOTSTRAP_SERVERS_CONFIG->"localhost:9092").asJava,
  nullDeserializer[Unit],
  circeJsonDeserializer[UserCreated]
)

Jsoniter Scala example

Jsoniter Scala. is a library that generates codecs for case classes, standard types and collections to get maximum performance of JSON parsing & serialization.

Here is an example of serialization/deserialization with Jsoniter Scala:

import com.ovoenergy.kafka.serialization.core._
import com.ovoenergy.kafka.serialization.jsoniter_scala._

// Import the Jsoniter Scala macros & core support
import com.github.plokhotnyuk.jsoniter_scala.macros._
import com.github.plokhotnyuk.jsoniter_scala.core._

import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.CommonClientConfigs._

import scala.collection.JavaConverters._

case class UserCreated(id: String, name: String, age: Int)

implicit val userCreatedCodec: JsonValueCodec[UserCreated] = JsonCodecMaker.make[UserCreated](CodecMakerConfig)

val producer = new KafkaProducer(
  Map[String, AnyRef](BOOTSTRAP_SERVERS_CONFIG->"localhost:9092").asJava, 
  nullSerializer[Unit],
  jsoniterScalaSerializer[UserCreated]()
)

val consumer = new KafkaConsumer(
  Map[String, AnyRef](BOOTSTRAP_SERVERS_CONFIG->"localhost:9092").asJava,
  nullDeserializer[Unit],
  jsoniterScalaDeserializer[UserCreated]()
)

Avro example

Apache Avro is a remote procedure call and data serialization framework developed within Apache's Hadoop project. It uses JSON for defining data types and protocols, and serializes data in a compact binary format.

Apache Avro provide some support to evolve your messages across multiple version without breaking compatibility with older or newer consumers. It supports several encoding formats but two are the most used in Kafka: Binary and Json.

The encoded data is always validated and parsed using a Schema (defined in JSON) and eventually evolved to the reader Schema version.

This library provided the support to Avro by using the Avro4s libray. It uses macro and shapeless to allowing effortless serialization and deserialization. In addition to Avro4s it need a Confluent schema registry in place, It will provide a way to control the format of the messages produced in kafka. You can find more information in the Confluent Schema Registry Documentation .

An example with Avro4s binary and Schema Registry:

import com.ovoenergy.kafka.serialization.core._
import com.ovoenergy.kafka.serialization.avro4s._

import com.sksamuel.avro4s._

import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.CommonClientConfigs._

import scala.collection.JavaConverters._

val schemaRegistryEndpoint = "http://localhost:8081"

case class UserCreated(id: String, name: String, age: Int)

// This type class is need by the avroBinarySchemaIdSerializer
implicit val UserCreatedToRecord = ToRecord[UserCreated]

val producer = new KafkaProducer(
  Map[String, AnyRef](BOOTSTRAP_SERVERS_CONFIG->"localhost:9092").asJava, 
  nullSerializer[Unit], 
  avroBinarySchemaIdSerializer[UserCreated](schemaRegistryEndpoint, isKey = false, includesFormatByte = true)
)

// This type class is need by the avroBinarySchemaIdDeserializer
implicit val UserCreatedFromRecord = FromRecord[UserCreated]

val consumer = new KafkaConsumer(
  Map[String, AnyRef](BOOTSTRAP_SERVERS_CONFIG->"localhost:9092").asJava,
  nullDeserializer[Unit],
  avroBinarySchemaIdDeserializer[UserCreated](schemaRegistryEndpoint, isKey = false, includesFormatByte = true)
)

This Avro serializer will try to register the schema every new message type it will serialize and will save the obtained schema id in cache. The deserializer will contact the schema registry each time it will encounter a message with a never seen before schema id.

The schema id will encoded in the first 4 bytes of the payload. The deserializer will extract the schema id from the payload and fetch the schema from the schema registry. The deserializer is able to evolve the original message to the consumer schema. The use case is when the consumer is only interested in a part of the original message (schema projection) or when the original message is in a older or newer format of the cosumer schema (schema evolution).

An example of the consumer schema:

import com.ovoenergy.kafka.serialization.core._
import com.ovoenergy.kafka.serialization.avro4s._

import com.sksamuel.avro4s._

import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.CommonClientConfigs._

import scala.collection.JavaConverters._

val schemaRegistryEndpoint = "http://localhost:8081"

/* Assuming the original message has been serialized using the 
 * previously defined UserCreated class. We are going to project
 * it ignoring the value of the age
 */
case class UserCreated(id: String, name: String)

// This type class is need by the avroBinarySchemaIdDeserializer
implicit val UserCreatedFromRecord = FromRecord[UserCreated]


/* This type class is need by the avroBinarySchemaIdDeserializer 
 * to obtain the consumer schema
 */
implicit val UserCreatedSchemaFor = SchemaFor[UserCreated]

val consumer = new KafkaConsumer(
  Map[String, AnyRef](BOOTSTRAP_SERVERS_CONFIG->"localhost:9092").asJava,
  nullDeserializer[Unit],
  avroBinarySchemaIdWithReaderSchemaDeserializer[UserCreated](schemaRegistryEndpoint, isKey = false, includesFormatByte = false)
)

Format byte

The Original Confluent Avro serializer/deserializer prefix the payload with a "magic" byte to identify that the message has been written with the Avro serializer.

Similarly this library support the same mechanism by mean of a couple of function. It is even able to multiplex and demultiplex different serializers/deserializers based on that format byte. At the moment the supported formats are

  • JSON
  • Avro Binary with schema ID
  • Avro JSON with schema ID

let's see this mechanism in action:

import com.ovoenergy.kafka.serialization.core._
import com.ovoenergy.kafka.serialization.avro4s._
import com.ovoenergy.kafka.serialization.circe._

// Import the Circe generic support
import io.circe.generic.auto._
import io.circe.syntax._

import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.CommonClientConfigs._
import scala.collection.JavaConverters._


sealed trait Event
case class UserCreated(id: String, name: String, email: String) extends Event

val schemaRegistryEndpoint = "http://localhost:8081"

/* This producer will produce messages in Avro binary format */
val avroBinaryProducer = new KafkaProducer(
  Map[String, AnyRef](BOOTSTRAP_SERVERS_CONFIG->"localhost:9092").asJava, 
  nullSerializer[Unit],   
  formatSerializer(Format.AvroBinarySchemaId, avroBinarySchemaIdSerializer[UserCreated](schemaRegistryEndpoint, isKey = false, includesFormatByte = false))
)

/* This producer will produce messages in Json format */
val circeProducer = new KafkaProducer(
  Map[String, AnyRef](BOOTSTRAP_SERVERS_CONFIG->"localhost:9092").asJava, 
  nullSerializer[Unit],   
  formatSerializer(Format.Json, circeJsonSerializer[UserCreated])
)

/* This consumer will be able to consume messages from both producer */
val consumer = new KafkaConsumer(
  Map[String, AnyRef](BOOTSTRAP_SERVERS_CONFIG->"localhost:9092").asJava,
  nullDeserializer[Unit],
  formatDemultiplexerDeserializer[UserCreated](unknownFormat => failingDeserializer(new RuntimeException("Unsupported format"))){
    case Format.Json => circeJsonDeserializer[UserCreated]
    case Format.AvroBinarySchemaId => avroBinarySchemaIdDeserializer[UserCreated](schemaRegistryEndpoint, isKey = false, includesFormatByte = false)
  }
)

/* This consumer will be able to consume messages in Avro binary format with the magic format byte at the start */
val avroBinaryConsumer = new KafkaConsumer(
  Map[String, AnyRef](BOOTSTRAP_SERVERS_CONFIG->"localhost:9092").asJava,
  nullDeserializer[Unit],
  avroBinarySchemaIdDeserializer[UserCreated](schemaRegistryEndpoint, isKey = false, includesFormatByte = true)
)

You can notice that the formatDemultiplexerDeserializer is little bit nasty because it is invariant in the type T so all the demultiplexed serialiazer must be declared as Deserializer[T].

There are other support serializer and deserializer, you can discover them looking trough the code and the tests.

Useful de-serializers

In the core module there are pleanty of serializers and deserializers that handle generic cases.

Optional deserializer

To handle the case in which the data is null, you need to wrap the deserializer in the optionalDeserializer:

import com.ovoenergy.kafka.serialization.core._
import com.ovoenergy.kafka.serialization.circe._

// Import the Circe generic support
import io.circe.generic.auto._
import io.circe.syntax._

import org.apache.kafka.common.serialization.Deserializer

case class UserCreated(id: String, name: String, age: Int)

val userCreatedDeserializer: Deserializer[Option[UserCreated]] = optionalDeserializer(circeJsonDeserializer[UserCreated])

Cats instances

The cats module provides the Functor typeclass instance for the Deserializer and Contravariant instance for the Serializer. This allow to do:

import cats.implicits._
import com.ovoenergy.kafka.serialization.core._
import com.ovoenergy.kafka.serialization.cats._
import org.apache.kafka.common.serialization.{Serializer, Deserializer, IntegerSerializer, IntegerDeserializer}

val intDeserializer: Deserializer[Int] = (new IntegerDeserializer).asInstanceOf[Deserializer[Int]]
val stringDeserializer: Deserializer[String] = intDeserializer.map(_.toString)
 
val intSerializer: Serializer[Int] = (new IntegerSerializer).asInstanceOf[Serializer[Int]]
val stringSerializer: Serializer[String] = intSerializer.contramap(_.toInt)

Complaints and other Feedback

Feedback of any kind is always appreciated.

Issues and PR's are welcome as well.

About this README

The code samples in this README file are checked using mdoc.

This means that the README.md file is generated from docs/src/README.md. If you want to make any changes to the README, you should:

  1. Edit docs/src/README.md
  2. Run sbt mdoc to regenerate ./README.md
  3. Commit both files to git

kafka-serialization's People

Contributors

arhelmus avatar bfil avatar cb372 avatar d-fernandes avatar david-mcgillicuddy-ovo avatar filosganga avatar gitter-badger avatar jadireddi avatar jiminhsieh avatar joedarby avatar l-catallo avatar olib963 avatar oliverlockwood avatar piotrfras avatar plokhotnyuk avatar scala-steward avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-serialization's Issues

Incompatibility with Kafka-connect

Hi
The library is very useful for scala developer but there is an issue with the idea to put in the payload the version of the schema. it's become incompatible with other Confluent platform.
When I push the message using avroBinarySchemaIdSerializer I'm able to consume the message with the corresponding Deserializer but kafka-connect failed I get this error

org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 256
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403

Is there any other way to use the library and keep the compatibility with kafka-connect ?
Thanks

Add support to demultiplexing on an header value

Given the new header support in Kafka, would be good to have deserializer that demultiplexes over a Header value.

eg. We could have a Content-Type header and apply a different deserializer base on this header value.

Remove JerseySchemaRegistryClient

The only purpose of the JerseySchemaRegistryClient was to provide authentication to the schema registry client. the standard one was not providing any means to authenticate.

The default schema registry client is now providing an authentication mechanism, so the Jersey-based schema registry client is not needed anymore.

API inconsistencies

There are a couple of unnecessary API inconsistencies:

  • In a few places, such as here, a parameter named includeFormatByte is used.
  • In a few places, such as here, the equivalent parameter is named includesFormatByte instead.

I'm happy to raise a PR to correct this, if you tell me which of the two you'd like to standardise towards.

Miss the confluent resolver

Please add it to build.sbt:

resolvers ++= Seq(
  "confluent-release" at "http://packages.confluent.io/maven/"
)

Unable to configure underlying KafkaAvroSerializer

Confluent's schema-registry version 4.0.0s contains a new property: auto.register.schemas which we intend to use.

We would like to add the ability to configure the underlying KafkaAvroSerializer. In our specific case we will be able to set auto.register.schemas to false and anyone else would be able to configure the serializer how they see fit.

Currently the underlying kafkaAvroSerializer is configured with a default. We do not have access to this serializer to configure it.

Calling configure on the serializer returned by #avroBinarySchemaIdSerializer is of no use as this method is purposely not implemented:

// Keep in mind that for serializer built with this library Kafka will never call `configure`.
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {}

Add a deserializer that return Either[ConsumerRecord[K, T], T]

In some cases, you want to have the original ConsumerRecord when the deserialization fails. At the moment it is not possible.

We would like to write a deserializer that given a ConsumerRecord[K, V] will return Either[ConsumerRecord[K,V], V] for the value (Or Either[ConsumerRecord[K,V], V] for the Key).

Support for avro4s 2+

We've updated avro4s to 2.0.2 as they've added support for deserialising instant values but this looks to have broken something.

[error] sbt.ForkMain$ForkError: java.lang.NoSuchMethodError: com.sksamuel.avro4s.ToRecord.apply(Ljava/lang/Object;)Lorg/apache/avro/generic/GenericRecord;
[error]         at com.ovoenergy.kafka.serialization.avro4s.Avro4sSerialization$$anonfun$avroBinarySchemaIdSerializerWithProps$1.apply(Avro4sSerialization.scala:284)
[error]         at com.ovoenergy.kafka.serialization.avro4s.Avro4sSerialization$$anonfun$avroBinarySchemaIdSerializerWithProps$1.apply(Avro4sSerialization.scala:283)
[error]         at com.ovoenergy.kafka.serialization.core.Serialization$$anon$1.serialize(Serialization.scala:41)```

From what we could gather it looks like the apply in avro4s 2+ FromRecord and ToRecord expects extra parameters from the one in 1.9.1.
1.9.1
https://github.com/sksamuel/avro4s/blob/v1.9.1/avro4s-macros/src/main/scala/com/sksamuel/avro4s/ToRecord.scala#L204
v2.0.2
https://github.com/sksamuel/avro4s/blob/v2.0.2/avro4s-core/src/main/scala/com/sksamuel/avro4s/ToRecord.scala#L23

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.