Giter Site home page Giter Site logo

srclient's Introduction

Schema Registry Client for Go

Go Report Card Go Reference

srclient is a Golang client for Schema Registry, a software that provides a RESTful interface for developers to define standard schemas for their events, share them across the organization, and safely evolve them in a way that is backward compatible and future proof. Using this client allows developers to build Golang programs that write and read schema compatible records to/from Apache Kafka using Avro, Protobuf, and JSON Schemas while Schema Registry is used to manage the schemas used. Using this architecture, producers programs interact with Schema Registry to retrieve schemas and use it to serialize records. Then consumer programs can retrieve the same schema from Schema Registry to deserialize the records. You can read more about the benefits of using Schema Registry here.

Features

  • Simple to Use - This client provides a very high-level abstraction over the operations developers writing programs for Apache Kafka typically need. Thus, it will feel natural for them to use this client's functions. Moreover, developers don't need to handle low-level HTTP details to communicate with Schema Registry.
  • Performance - This client provides caching capabilities. This means that any data retrieved from Schema Registry can be cached locally to improve the performance of subsequent requests. This allows programs not co-located with Schema Registry to reduce the latency necessary on each request. This functionality can be disabled programmatically.

License: Apache License v2.0

Installation

Module install:

This client is a Go module, therefore you can have it simply by adding the following import to your code:

import "github.com/riferrei/srclient"

Then run a build to have this client automatically added to your go.mod file as a dependency.

Manual install:

go get -u github.com/riferrei/srclient

Testing

Unit testing can be run with the generic go test command:

go test -cover -v ./...

You can also run integration testing in your local machine given you have docker installed:

docker compose up --exit-code-from srclient-integration-test
docker compose down --rmi local

Getting Started & Examples

Acknowledgements

srclient's People

Contributors

adamtabrams avatar alok87 avatar arkiaconsulting avatar atakancolak avatar bigkraig avatar biryukovmaxim avatar chengcheng-pei avatar chrisdginn avatar doxsch avatar dseapy avatar dstendardi avatar finncolman avatar gustavooferreira avatar hrbcginn avatar insomniadev avatar jaumesala avatar josephglanville avatar kompl avatar lpessoa avatar maarek avatar mattfung avatar mdpaquin avatar melihaydogd avatar mrflatt avatar riferrei avatar survivorbat avatar tariel-x avatar topliceanu avatar tpanetti avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

srclient's Issues

Support Protobuf Codec

It would be great if you supported codecs for protobuf. It completes the deserialization/serialization of messages without statically typed message transformation.

Protobuf usage example

Hey, I am trying to figure out if I can use this client to fetch Protobuf schemas in a Go consumer. Does anyone have any examples of how you could accomplish this?
Without running the Proto cli to pre-generate the code I am not sure how this would work at runtime.

Any pointers or example would be appreciated!

Avro serialization junk value in confluent cloud

i am using this client to create the schema and send avro messages to topic.
This works fine in my local machine ( macOS ) and in my GCP compute unit where downloaded version of confluent is running.

After the trails i got the confluent cloud access and tried with basic avro serialization doesnt seems to work as expected. The value section is always prefixed with junk characters and doesn't have the same behaviour which worked out in my local machine and my GCP single machine.

Here is the details and code snippet and the process which is followed.

  • Created a topic in confluent cloud by name sample_topic_003
  • Schema File for Avro
{
	"doc": "Sample schema to help you get started...",
	"fields": [
	  {
		"doc": "The int type is a 32-bit signed integer.",
		"name": "id",
		"type": "int"
	  },
	  {
		"doc": "The string is a unicode character sequence.",
		"name": "name",
		"type": "string"
	  }
	],
	"name": "my_sample_schema_three",
	"namespace": "com.mycorp.mynamespace.local.three",
	"type": "record"
  }

Golang Code snippet for sending the data to confluent cloud

func MethodPOCAvro() {

	type ComplexType struct {
		ID   int    `json:"id"`
		Name string `json:"name"`
	}

	sampleSchema := `{
		"doc": "Sample schema to help you get started...",
		"fields": [
		  {
			"doc": "The int type is a 32-bit signed integer.",
			"name": "id",
			"type": "int"
		  },
		  {
			"doc": "The string is a unicode character sequence.",
			"name": "name",
			"type": "string"
		  }
		],
		"name": "my_sample_schema_three",
		"namespace": "com.mycorp.mynamespace.local.three",
		"type": "record"
	  }`
	topic := "sample_topic_003"
	kafkaSchemaRegistryURL = "https://xxx.gcp.confluent.cloud"

	/*** Start registering schemas in the confluent schema registry ***/
	schemaRegistryClient := srclient.CreateSchemaRegistryClient(kafkaSchemaRegistryURL)
	schemaRegistryClient.SetCredentials("xxx", "xxx")

	schema, err := schemaRegistryClient.GetLatestSchema(topic, false)
	if schema == nil {
		log.Print("Schema not found registering now ")
		schema, err = schemaRegistryClient.CreateSchema(topic, sampleSchema, srclient.Avro, false)
		if schema != nil {
			log.Print("Register successful ")
		}
		if err != nil {
			panic(fmt.Sprintf("Error creating the schema  %s", err))
		}
	}

	// Init producer
	p, err := kafka.NewProducer(&kafka.ConfigMap{
		"bootstrap.servers": "xxx.confluent.cloud:9092",
		"sasl.mechanisms":   "PLAIN",
		"security.protocol": "SASL_SSL",
		// "debug":                               "all",
		"sasl.username":                       "xxx",
		"enable.ssl.certificate.verification": "false",
		"sasl.password":                       "xxx"})


		time.Sleep(1 * time.Second)

		schemaIDBytes := make([]byte, 4)
		binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.ID()))
		log.Println("Schema ID ", schema.ID())

		// 3) Serialize the record using the schema provided by the client,
		// making sure to include the schema id as part of the record.
		newComplexType := ComplexType{ID: 1, Name: "Gopher"}
		value, _ := json.Marshal(newComplexType)
		log.Println("value ", value)

		native, _, _ := schema.Codec().NativeFromTextual(value)
		log.Println("native ", native)

		valueBytes, _ := schema.Codec().BinaryFromNative(nil, native)
		log.Println("valueBytes ", valueBytes)
		var recordValue []byte
		recordValue = append(recordValue, byte(0))
		recordValue = append(recordValue, schemaIDBytes...)
		recordValue = append(recordValue, valueBytes...)
		key, _ := uuid.NewUUID()
		log.Println("recordValue ", recordValue)

		err = p.Produce(&kafka.Message{
			TopicPartition: kafka.TopicPartition{
				Topic: &topic, Partition: kafka.PartitionAny},
			Key: []byte(key.String()), Value: recordValue}, nil)

		if err != nil {
			fmt.Fprintf(os.Stderr, "Exception while producing the message .\n")
		}

		k := p.Flush(200)
		log.Print("Pushed ", k)
	
}
  • After the program execution I am expecting value.id and value.name but i get only value having prefixed with junk character.

Screenshot 2020-09-10 at 12 12 59 AM

  • I have even verified the bytes being passed on to confluent cloud. for the above program following the console output
Schema not found registering now 
Register successful 

Schema ID  100022
value  [123 34 105 100 34 58 49 44 34 110 97 109 101 34 58 34 71 111 112 104 101 114 34 125]
native  map[id:1 name:Gopher]
valueBytes  [2 12 71 111 112 104 101 114]
recordValue  [0 0 1 134 182 2 12 71 111 112 104 101 114]
Pushed 1

Screenshot 2020-09-10 at 12 26 19 AM

  • Console output of my local setup
Schema ID  1
value  [123 34 105 100 34 58 49 44 34 110 97 109 101 34 58 34 71 111 112 104 101 114 34 125]
native  map[id:1 name:Gopher]
valueBytes  [2 12 71 111 112 104 101 114]
recordValue  [0 0 0 0 1 2 12 71 111 112 104 101 114]
Pushed 1

Notes :

  • If you notice from 1st till the 4th byte there is variation due to schema ID.
  • Schema ID returned from my local setup is single digit but in confluent cloud i got schema ID of 6 digits.
  • Do I need to consider any headers while generating wire order here. ?

improve handling errors from schema registry api

Whant use something like that:

var srcErr srclient.Error
	schema, err := c.schemaRegistryClient.GetLatestSchema(topicName)
	if err != nil && errors.As(err, &srcErr) && srcErr.Code != 40401 {
		return err
	}

if Code == 40401 needs to create schema

I sent PR to solve it #59

CreateMockSchemaRegistryClient gives no codec

Hey there,

Loving this library so far, really like that you put thought in using interfaces and mocks, can't express how happy I am that you added those.

One thing I'm struggling with though is that in the following example Codec() is nil.
image

image

I'm expecting to be able to unit-test the example code, but the Codec is never set or supplied. Am I missing something obvious? Or am I meant to exclude the codec stuff for testing purposes only? I hope not.

Yours sincerely,

Alex

Schema name used for avro serialized data

Hi,

I would like to use the client to communicatie with our Apache Kafka (MSK) cluster which is working differently as the confluence variants.
The main difference is, there is no direct relation with the schema server in de producing process, only the actual schema name which was being used during the producing process of a message is being added/sent as metadata:

event-type: test.user_activities
schema-id: test/user_activities/v0.0.1/schema.avsc
content-type: application/x-avro-json

{ -- avro json serialized data according to schema-id --}

There is a python package which can be use with this Kafka cluster, but I prefer to use Golang.
En snippet from the python producer, to make it more clear:

data = {"message": "Hello world!"}
topic = "helloworld"

async def produce():
    for _ in range(5):
        # Serialize the data with APPLICATION_X_AVRO_BINARY
        metadata = await stream_engine.send(
            topic, value=data, event_type="hello_world.test", schema_id="example/hello_world/v0.0.1/schema.avsc"
        )
        print(f"Message sent: {metadata}")

        # Serialize the data with APPLICATION_X_AVRO_JSON
        metadata = await stream_engine.send(
            topic,
            value=data,
            event_type="hello_world.test",
            schema_id="example/hello_world/v0.0.1/schema.avsc",
            serialization_type=consts.APPLICATION_X_AVRO_JSON,
        )
        print(f"Message sent: {metadata}")

So if someone reads this, would this be possible with this package? Or are there other Golang packages more suitable?
Or am I really banned to use python ;-)

Compatibility

It seems that there is a breaking change in the Schema Registry API posted by Confluent. Older APIs do not allow schemaType to be in the payload of a POST request to create a schema. Since these APIs do not support unknown fields, what are possible solutions to handle those who want to use the srclient with older schema registries.

I am leaving this open for ideas on how best to tackle compatibility while progressing the client as Confluent updates the API.

Ref. #11 #19

Can't create schema with .avsc file

Follow the example codes to create a schema when the Schema Registry doesn't contain it.
The codes look like

if schema == nil {
    src := srclient.CreateSchemaRegistryClient("http://127.0.0.1")
    schema, err := src.GetLatestSchema(topic)
    if schema == nil {
        schemaBytes, _ := ioutil.ReadFile("test.avsc")
        schema, err = src.CreateSchema(topic, string(schemaBytes), srclient.Avro)
        if err != nil {
            logger.FatalPrintf("Error creating the schema %s", err)
        }
    }
}

The test.avsc file

{
  "type": "record",
  "namespace": "customer",
  "name": "customer_v1",
  "doc": "Test schema",
  "fields": [
    { "name": "first_name", "type": "string", "doc": "First Name of Customer" },
    { "name": "last_name", "type": "string", "doc": "Last Name of Customer" },
    { "name": "age", "type": "int", "doc": "Age at the time of registration" },
    { "name": "height", "type": "float", "doc": "Height at the time of registration in cm" },
    { "name": "weight", "type": "float", "doc": "Weight at the time of registration in kg" },
    { "name": "automated_email", "type": "boolean", "default": true, "doc": "Field indicating if the user is enrolled in marketing emails" }
  ]
}

By using these codes, I got an error from Schema Registry server

INFO   [2022-02-21 10:35:12.886] [dw-36955] c.h.r.s.w.ConfluentSchemaRegistryCompatibleResource -  registerSchema for [test-go] is [{}]
ERROR  [2022-02-21 10:35:12.892] [dw-36955] c.h.r.s.w.ConfluentSchemaRegistryCompatibleResource -  Encountered error while adding subject [test-go]
com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "schemaType" (class com.hortonworks.registries.schemaregistry.webservice.ConfluentSchemaRegistryCompatibleResource$SchemaString), n
ot marked as ignorable (one known property: "schema"])
 at [Source: (String)"{"schema":"{   \"type\": \"record\",   \"namespace\": \"customer\",   \"name\": \"customer_v1\",   \"fields\": [     {\"name\": \"first_name\", \"type\": \"string\"},     {\"name\": \"last_name\
", \"type\": \"string\"},     {\"name\": \"age\", \"type\": \"int\"},     {\"name\": \"height\", \"type\": \"float\"},     {\"name\": \"weight\", \"type\": \"float\"},     {\"name\": \"automated_email\", \"type\": \"
boolean\", \"default\": true}   ] }","schemaType":"AVRO","references":[]}"; line: 1, column: 462] (through reference chain: com.hortonworks.registries.schemaregistry.webservice.ConfluentSchemaRegistryCompatibleResour
ce$SchemaString["schemaType"])
        at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
        at com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:843)
        at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1206)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1597)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownVanilla(BeanDeserializerBase.java:1575)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:294)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:151)
        at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4218)
        at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3214)
        at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3182)
        at com.hortonworks.registries.schemaregistry.webservice.ConfluentSchemaRegistryCompatibleResource.schemaStringFromJson(ConfluentSchemaRegistryCompatibleResource.java:425)
        at com.hortonworks.registries.schemaregistry.webservice.ConfluentSchemaRegistryCompatibleResource.lambda$registerSchemaVersion$1(ConfluentSchemaRegistryCompatibleResource.java:361)
        at com.hortonworks.registries.schemaregistry.webservice.BaseRegistryResource.handleLeaderAction(BaseRegistryResource.java:76)
        at com.hortonworks.registries.schemaregistry.webservice.ConfluentSchemaRegistryCompatibleResource.registerSchemaVersion(ConfluentSchemaRegistryCompatibleResource.java:342)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory$1.invoke(ResourceMethodInvocationHandlerFactory.java:81)
        at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:144)
        at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:161)
        at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$ResponseOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:160)
        at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:99)
        at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:389)
        at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:347)
        at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:102)
        at org.glassfish.jersey.server.ServerRuntime$2.run(ServerRuntime.java:326)
        at org.glassfish.jersey.internal.Errors$1.call(Errors.java:271)
        at org.glassfish.jersey.internal.Errors$1.call(Errors.java:267)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:315)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:297)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:267)
        at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:317)
        at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:305)
        at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:1154)
        at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:473)
        at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:427)

Removing SchemaType Broke Request

Ref #11 #12

I am not sure if there was a case where the Schema Registry that @alok87 was on was not updated to the latest API but by removing schemaType, all Schemas are now assumed to be AVRO and break PROTOBUF requests.

We should revert PR #12 and come to more understandings on a case where schemaType is not available on the API.

Confluent Schema Registery API Doc
https://docs.confluent.io/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)-versions

Using JSON with schema registry

I'm using JSON for data transfer over kafka. It works great but then I wanted to add schema registry to validate the JSON.
This is where I'm hoping someone could help me resolve my issue.

I see there is an AVRO example and Protobuf example and so I've looked at the AVRO example and tried that with JSON but just replace codec with JsonSchema. And I'm not able to get it to work I get the error: jsonschema: invalid jsonType: []uint8

This is the registered schema (I've checked with schema.SchemaType() and schema.Schema())

{
   "$schema": "http://json-schema.org/draft-04/schema#",
   "type": "object",
   "properties": {
     "action": {
       "type": "string"
     }
   },
   "required": [
     "action"
   ]
}

This is what I send and what I get on the consumer side

{"action":"SEND_EMAIL"}

And I've checked that with

string(kafkaMessage.Value[5:])


This is the code that is checking the json schema (I'm using the "github.com/segmentio/kafka-go" library for kafka)

// read the message
kafkaMessage, _ := r.ReadMessage(context.Background())

// print the json contents of the kafka message so I can see if the correct JSON is sent
string(kafkaMessage.Value[5:])

// extract the schema id from the kafka message
schemaID := binary.BigEndian.Uint32(kafkaMessage.Value[1:5])

// get schema with the schema id
schema, err := schemaRegistryClient.GetSchema(int(schemaID))

// print the schema type and the actual schema so I can see that it's pulling the correct schema and if it's the schema I want
fmt.Println(schema.SchemaType().String())
fmt.Println(schema.Schema())

// validate the message with the json schema
err = schema.JsonSchema().Validate(kafkaMessage.Value[5:])
if err != nil {
   // this is where the error "jsonschema: invalid jsonType: []uint8" is thrown
}

Please note this is not a copy paste of my code so if there is a syntax error it's just here not in the actual code.
If you have any idea what could be wrong or if I'm using the library wrong some info/example would be great

CreateSchema method

The method returns the latest version of the schema, but the schema registry behaves differently: it compares the schemas with the existing ones, and if it doesn't find it, it creates a new schema. In this case, the answer will be correct.
But if it finds a match, then the post request will return the id of the existing schema, in which case the answer will be incorrect, since the existing schema will not necessarily be the last one.

cp-schema-registry:7.0.1

Allow for custom TLS configuration

Currently when creating a new instance of SchemaRegistryClient, there is no ability to provide custom TLS configuration to the HTTP client.

I notice a similar issue, #22, which looks to provide a mechanism to use a custom HTTP client. This would also be sufficient.

Bearer Authentication Missing

I need to be able to communicate with a schema registry through use of a Bearer Token, I have created the code and have it in a branch ready for a PR - but don't have permissions to push the branch for a PR.

Protobuf Handling

compiledRegex := regexp.MustCompile(`\r?\n`)

In CreateSchema newlines are replaced with spaces. This might work when they payload is JSON as would be the case with Avro and JsonSchema but it does not work for Protobuf.

I'm going to recommend that schemaType parameter be converted to an enum type and this data sanitation occur only for Avro and JsonSchema.

Following the example i get unknown field "schemaType" response from schema registry

Error

panic: 422 : Unrecognized field: schemaType

Program

package main

import (
	"encoding/binary"
	"encoding/json"
	"fmt"
	"io/ioutil"

	"github.com/riferrei/srclient"
)

func main() {
    topic := "loader-datapipe.inventory.products"
    schemaRegistryClient := srclient.CreateSchemaRegistryClient("XXXX:8081")

	schema, err := schemaRegistryClient.GetLatestSchema(topic, false)
	if schema == nil {
		schemaBytes, _ := ioutil.ReadFile("complexType.avsc")
		schema, err = schemaRegistryClient.CreateSchema(topic, string(schemaBytes), srclient.Avro, false)
		if err != nil {
			panic(fmt.Sprintf("Error creating the schema %s", err))
		}
	}
	schemaIDBytes := make([]byte, 4)
	binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.ID()))

    fmt.Printf("schemaIDBytes:%v\n" schemaIDBytes)

}

SRClient fails and terminates application on error from schema registry

I just upgraded to srclient v0.5.0 and my application stated to panic on call GetLatestSchema call when schema does not exist.

In access log to schema registry, I see that the return code is 404. The call itself ends with panic and application is terminated.

The application code is quite straight

schema, err = srClient.GetLatestSchema(topic)
if err != nil {
  // logic that installs schema in schema registry
}

However, the error handling is not reached as the application fails with this:

panic: runtime error: makeslice: cap out of range
goroutine 7 [running]:
github.com/riferrei/srclient.createError(0xc000438090, 0xc0021d4000, 0xc000438090)
/go/pkg/mod/github.com/riferrei/[email protected]/schemaRegistryClient.go:746 +0x73
github.com/riferrei/srclient.(*SchemaRegistryClient).httpRequest(0xc0002ee000, 0xed1c3d, 0x3, 0xc00034a180, 0x33, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
/go/pkg/mod/github.com/riferrei/[email protected]/schemaRegistryClient.go:637 +0x385
github.com/riferrei/srclient.(*SchemaRegistryClient).getVersion(0xc0002ee000, 0xc000354a00, 0x19, 0xed2b18, 0x6, 0xc0021a4230, 0x0, 0x0)
/go/pkg/mod/github.com/riferrei/[email protected]/schemaRegistryClient.go:570 +0x155
github.com/riferrei/srclient.(*SchemaRegistryClient).GetLatestSchema(...)
/go/pkg/mod/github.com/riferrei/[email protected]/schemaRegistryClient.go:243

Same application code works correctly with v0.4.0

schema.Codec() will return nil, without print any warning

	schemaRegistryClient := srclient.CreateSchemaRegistryClient(schemaRegistryUrl)
	for {
		msg, err := c.ReadMessage(-1)
		if err != nil {
			logger.Error("Read Message Failed", zap.Error(err))
			continue
		}
		logger.Info("Get a message", zap.Any("Value", msg.Value), zap.Any("Header", msg.Headers))
		schemaID := binary.BigEndian.Uint32(msg.Value[1:5])
		schema, err := schemaRegistryClient.GetSchema(int(schemaID))
		if err != nil {
			logger.Fatal("Get Schema Failed", zap.Uint32("SchemaID", schemaID), zap.Error(err))
		}
		logger.Info("Schema info", zap.Any("Schema", schema.Schema()))
		content := msg.Value[5:]
		logger.Info("Content", zap.Any("RawData", content))
		native, _, err := schema.Codec().NativeFromBinary(content)
		if err != nil {
			logger.Fatal("Decode message failed!", zap.Error(err))
		}
		fmt.Printf("Here is the message %v\n", native)
	

Hi, When I use this example code from readme, I got the right schema, but when decode the content, the function schema.Codec().NativeFromBinary(content) paniced with this error: panic: runtime error: invalid memory address or nil pointer dereference. Could someone tell me, how could I fix it?

Total Logs are:

2021-12-24T14:08:24.694+0800	INFO	Get a message	{"Value": "AAAAAUQAAiAxMTM4ODE2NzcwMTM5NDUxAhQxMjM0NTc0NTMxFiowMzE2MDA5MTM4AhAxNjAwOTE1MwICMNSfAigyMDIwLTA1LTI4VDIwOjE2OjQ4WgAYYXdiOHh2cDJza3NnBjE0MgIzDDAwMDAwMQJBAgAMMDAwMDAwAgIAAgQnEAIEJqwCBCcQAjACAAACMgIIMDAwMAIM5oiQ5YqfAgACAAIAAgACAgACAgACAmQoOTg1MTI4MzUzNzA2NTA4OTcwMTIIMDAwOAICMAIwAjEEMDMCAAIx1J8CAgACAAIAAgACAAIEJxACAgACAjACAAIAAhg2OTIxNTY0NTg2OTQAAgAAAjACBGFhAgACAAACAAA2eyJleHRTeXN0ZW1JZCI6ICJBSVBfQVVUTyJ9AAIwAtSfAgIAFjEuNS4wLkZpbmFsCm15c3FsGnFtX3Rlc3RfbXlzcWyoiuGmvV8ACHRydWUceWluZzk5X2Z1bmR0eG4AAhRmdW5kX29yZGVyAAAgbXlzcWwtYmluLjAwMDAwMYbgpO8DAAAAAnICxorhpr1fAA==", "Header": null}
2021-12-24T14:08:24.850+0800	INFO	Schema info	{"Schema": "{\"type\":\"record\",\"name\": ****** }
2021-12-24T14:08:24.850+0800	INFO	Content	{"RawData": "AAIgMTEzODgxNjc3MDEzOTQ1MQIUMTIzNDU3NDUzMRYqMDMxNjAwOTEzOAIQMTYwMDkxNTMCAjDUnwIoMjAyMC0wNS0yOFQyMDoxNjo0OFoAGGF3Yjh4dnAyc2tzZwYxNDICMwwwMDAwMDECQQIADDAwMDAwMAICAAIEJxACBCasAgQnEAIwAgAAAjICCDAwMDACDOaIkOWKnwIAAgACAAIAAgIAAgIAAgJkKDk4NTEyODM1MzcwNjUwODk3MDEyCDAwMDgCAjACMAIxBDAzAgACMdSfAgIAAgACAAIAAgACBCcQAgIAAgIwAgACAAIYNjkyMTU2NDU4Njk0AAIAAAIwAgRhYQIAAgAAAgAANnsiZXh0U3lzdGVtSWQiOiAiQUlQX0FVVE8ifQACMALUnwICABYxLjUuMC5GaW5hbApteXNxbBpxbV90ZXN0X215c3FsqIrhpr1fAAh0cnVlHHlpbmc5OV9mdW5kdHhuAAIUZnVuZF9vcmRlcgAAIG15c3FsLWJpbi4wMDAwMDGG4KTvAwAAAAJyAsaK4aa9XwA="}
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x50 pc=0x66f3c3]

goroutine 1 [running]:
github.com/linkedin/goavro/v2.(*Codec).NativeFromBinary(0xc0000a2280, {0xc00009a005, 0x1a0, 0x1a0})
	/Users/***/go/pkg/mod/github.com/linkedin/goavro/[email protected]/codec.go:344 +0x23

srclient.Schema fields unexported without constructor - makes SchemaRegistryClient unmockable

All members of srclient.Schema are unexported, so it is impossible to populate an instance of srclient.Schema with values. When implementing a mock SchemaRegistryClient, I want to specify the schema instance that should be returned by GetLatestSchema, but I am unable to without resorting to obnoxious reflection tricks.

It's all well and good to create an immutable struct, but how about providing a constructor or a builder of some kind so that it is at least possible to create one for testing. I am reluctant to use the provided MockSchemaRegistryClient because that mock has far too much logic in it, which might include bugs of its own. A mock should be super simple and just allow for the injection of return values for a given method call. And it is easy enough to create such a mock, except that the return value I need to inject cannot be populated because it is immutable and lacks a constructor.

This kind of thing shouldn't be necessary, and wouldn't be if there was a constructor function

	suite.schema = &srclient.Schema{}
	SetUnexportedField(reflect.ValueOf(suite.schema).Elem().FieldByName("id"), 1)
	SetUnexportedField(reflect.ValueOf(suite.schema).Elem().FieldByName("schema"), "MyDomainEvent")
	SetUnexportedField(reflect.ValueOf(suite.schema).Elem().FieldByName("version"), 1)

	suite.srclient = new(MockRegistryClient)
	suite.srclient.On("GetLatestSchema", "MyDomainEvent", false).Return(suite.schema, nil)

Schema not found errors when creating schemas

Hello,

First of all, thank you for this amazing module, it really helped us

We are using it for automating schema registry operations for other teams with Git. Sometimes we've found strange errors when creating schemas, example:
"{"error_code":40403,"message":"Schema 2213 not found"}"

Our Schema Registry architecture is composed on multiple instances for high availability purposes. Checking module code, we realized "CreateSchema" method performs 2 operations:

Our hypothesis is, as the http requests are distributed between the different backends:

  1. POST is routed to backend 1 (could be master or not, will redirect to master if not), successfully registering the schema
  2. GET is routed to backend 2 (can't be master), which didn't fetch yet the change from the topic
  3. Schema not found error is returned
  4. Schema is available as it was successfully registered

At least this makes sense to us. I'm super happy contributing and making code changes, but i'd like to align with you on the approach, we've some things in mind:

  • New method similar to SetTimeout, example "SetCreateGetDelay". If set, it adds a delay between operations (https://github.com/riferrei/srclient/blob/master/schemaRegistryClient.go#L404)
  • New method called "SkipCreateGetSchema". If set, it skips getting the schema, decoupling it
  • Remove / decouple GetSchema from CreateSchema. For us make sense, probably for other usages can impact
  • Any other option proposal

If you need any additional detail, please let me know

Thanks in advance, Guille

Schema Registry retry and check if the schema exist

func (c *cSchemaRegistry) GetLatestSchema(
	subject string, key bool) (*Schema, error) {
	cSchema, err := c.client.GetLatestSchema(subject, key)
	if err != nil {
		return nil, err
	}

	return toSchema(cSchema), nil
}

This call returns the error in both the cases:

  1. when the schema does not exist, it returns the error
  2. when their was a network issue in making the call, it returns the error.

I want to add a retry mechanism only for the 2nd case. How do I check if the error was for 1 and not 2?

Retry support

I had to add a retry functionality. This library supports retry then it would be nice.

I understand there is a cache already there. But this is needed when 1000 tables schema calls happen together for the first time the process starts. A simple retry is very useful here.

// GetLatestSchemaWithRetry gets the latest schema with some retry on failure
func GetLatestSchemaWithRetry(client *srclient.SchemaRegistryClient,
	topic string, isKey bool, attempts int) (*srclient.Schema, error) {
	for i := 0; ; i++ {
		schema, err := client.GetLatestSchema(topic, isKey)
		if err != nil {
			return schema, nil
		}
		if i >= (attempts - 1) {
			return nil, fmt.Errorf("Failed to get latest schema, err:%v\n", err)
		}
		klog.Warningf("Error fetching latest schema, err:%v\n", err)
		sleepFor := time.Duration((i * 1000) + 1)
		time.Sleep(sleepFor * time.Millisecond)
	}
}

Why does GetLatestSchema invalidates the cache ?

Hello @riferrei !

Thank you for this library! it's been very useful and we are hoping to contribute to it!

the go doc says :

In order to ensure consistency, we need
to temporarily disable caching to force
the retrieval of the latest release from
Schema Registry.

AFAIU the java serializer does cache the schema even when looking up the latest schema version

Could you explain a bit more why you decided to invalidate the cache for this use case? We are considering using this library in our producers however given the throughput we aim it feels not right to hit the schema registry on every call :)

What other method would you use to satisfy that use case ?

Version 0.4 is not resolved by go mod download

Description of the problem

I tried to use version 0.4 published by @AtakanColak. Unfortunately this is what I get when syncing go modules ๐Ÿ‘

go: errors parsing go.mod:
/xxx/Worskpace/go.mod:7:2: no matching versions for query "v0.4"

Proposed solution

Create a version 0.4.0 which fits go standard practices.

Protobuf Producer Example

Hello,

I am currently facing some issues with trying to create a Kafka Producer for passing in Protobuf data. As a reference point, I tried to edit the Producer example application (since I got the AVRO example working without issues) as follows:

package main

import (
	"encoding/binary"
	"encoding/json"
	"fmt"
	"io/ioutil"

	"github.com/google/uuid"
	"github.com/riferrei/srclient"
	"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

type ComplexType struct {
	ID   int    `json:"id"`
	Name string `json:"name"`
}

func main() {

	topic := "myTopic"

	// 1) Create the producer as you would normally do using Confluent's Go client
	p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
	if err != nil {
		panic(err)
	}
	defer p.Close()

	go func() {
		for event := range p.Events() {
			switch ev := event.(type) {
			case *kafka.Message:
				message := ev
				if ev.TopicPartition.Error != nil {
					fmt.Printf("Error delivering the message '%s'\n", message.Key)
				} else {
					fmt.Printf("Message '%s' delivered successfully!\n", message.Key)
				}
			}
		}
	}()

	// 2) Fetch the latest version of the schema, or create a new one if it is the first
	schemaRegistryClient := srclient.CreateSchemaRegistryClient("http://localhost:8081")
	schema, err := schemaRegistryClient.GetLatestSchema(topic, false)
	if schema == nil {
		schemaBytes, err := ioutil.ReadFile("example/complexType.proto")
		if err != nil {
			panic(fmt.Sprintf("File not found %s", err))
		}

		schema, err = schemaRegistryClient.CreateSchema(topic, string(schemaBytes), srclient.Protobuf, false)
		if err != nil {
			panic(fmt.Sprintf("Error creating the schema %s", err))
		}
	}
	schemaIDBytes := make([]byte, 4)
	binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.ID()))

	// 3) Serialize the record using the schema provided by the client,
	// making sure to include the schema id as part of the record.
	newComplexType := ComplexType{ID: 1, Name: "Gopher"}
	value, _ := json.Marshal(newComplexType)
	native, _, _ := schema.Codec().NativeFromTextual(value)
	valueBytes, _ := schema.Codec().BinaryFromNative(nil, native)

	var recordValue []byte
	recordValue = append(recordValue, byte(0))
	recordValue = append(recordValue, schemaIDBytes...)
	recordValue = append(recordValue, valueBytes...)

	key, _ := uuid.NewUUID()
	p.Produce(&kafka.Message{
		TopicPartition: kafka.TopicPartition{
			Topic: &topic, Partition: kafka.PartitionAny},
		Key: []byte(key.String()), Value: recordValue}, nil)

	p.Flush(15 * 1000)

}

The protobuf schema looks like this and can be found in example/complexType.proto:

syntax = "proto3";

package example;

message ComplexType {
    int64 id = 1;
    string name = 2;
}

When I try to run this application, I get the following error message displayed:

>>> go run app_srclient.go
panic: Error creating the schema 422 : Either the input schema or one its references is invalid

goroutine 1 [running]:
main.main()
        <PATH>/app_srclient.go:55 +0x879
exit status 2

In advance, I'd like to thank anyone who could help out or provide examples to solve this issue!

Metrics

Hey @riferrei,

Thanks for your awesome library. I'm using it in my project xk6-kafka, as you already know. I thought it would be a good idea to introduce some metrics into the library functions, so the usages, network traffic and a few different things can be measured. If I can find some time, I'll probably do that.

Also, I'd very happy to have your feedback on this. ๐Ÿ™

functional options and linting

Hey there,

I feel motivated to collaborate on the project a bit more and I have 2 suggestions that I'd like feedback on.

The first one concerns linting. I've recently adapted golangci-lint in all my projects and with some custom config was able to find loads of improvements in my own code. Would it be an idea to add that here too? To enforce code style and catch common issues (like preallocation of slices etc.)

The second suggestion would be an improvement of the constructor method. Instead of just CreateSchemaRegistry and its advanced variant, I'd love to add:

NewSchemaRegistry(url, options...)

With the options being:

  • WithHttpClient(...)
  • WithSemaphoreWeight(...)

This would allow us to expand the options in the future without adding more constructors, furthermore it's also more in line with Go's convention of using New. We can also set defaults more sensibly.

I'd love to hear your thoughts on this, I might work on a PR to reflect the suggestions.

https connection to API endpoint

Hi there!

Does srclient support https client? I want to integrate with confluent schema registry, but the url endpoint is https which srclient seems do not support. Thanks in advance!

Extending Integration Tests (help wanted)

With #86 we now do support integration tests both in local docker and GitHub Actions, but we need to extend the existing integration tests to cover for most of the functionalities.

Particular Tests

  • Creating a Schema With References
  • Fetching References of a Schema
  • go.mod version compatibility integration test: version in go.mod file is not updated if we use a newer capability, so we need to test that by building in the oldest version in one of the integration tests
  • Protobuf, #85 can be used for reference

Suport Karapace

Currently Karapace requests fail if "references" is set, even if empty: "Unrecognized field: references".
Aiven-Open/karapace#195

In order to support Karapace in srclient I suggest adding an arg to the CreateSchemaRegistryClient method that allows to override the default behaviour if needed. The only change would be to add omitempty on the References field in the structs and looking at the arg to decide if the default empty value for References should be set.

Here is my closed PR with code for reference
#68

Expose Http Client

Certain methods for exposing the registry client (namely service tokens require access to the http client to add auth headers. Would you be open to exposing the httpClient->HttpClient on SchemaRegsitryClient so we can add in interceptors and inject the headers we need. I'm open to other approaches as well and will be happy to submit a pull request however you want me to implement this

Avro support and resolving referenced schemas

I have two issues related to #14

  1. When fetching a schema from Confluent Cloud the schema response contains only the references and not the source definition. Without the source schemas, any deserializer is unable to unmarshal the event data. Is there any recommended process to resolve the references to the source records?

  2. It doesn't seem like the references in an Avro schema are working with this library. When calling schema.References() I get an empty list. See here for the example I followed.

Mocking

The SchemaRegistryClient should either implement an interface so that its functionality can be mocked or it should have the option to accept an http client so the underlying http client can be mocked.

Or Both? This would be an alternative to testing using net/http/httptest. Thoughts?

How to use the schema string

Output:

$ go run schema_get.go
{"type":"record","name":"Envelope","namespace":"datapipe.inventory.customers","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"id","type":"int"},{"name":"first_name","type":"string"},{"name":"last_name","type":"string"},{"name":"email","type":"string"}],"connect.name":"datapipe.inventory.customers.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.mysql","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"table","type":["null","string"],"default":null},{"name":"server_id","type":"long"},{"name":"gtid","type":["null","string"],"default":null},{"name":"file","type":"string"},{"name":"pos","type":"long"},{"name":"row","type":"int"},{"name":"thread","type":["null","long"],"default":null},{"name":"query","type":["null","string"],"default":null}],"connect.name":"io.debezium.connector.mysql.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}],"default":null}],"connect.name":"datapipe.inventory.customers.Envelope"}

Program:

package main

import (
	"fmt"
	"github.com/riferrei/srclient"
)

func main() {
    schemaRegistryClient := srclient.CreateSchemaRegistryClient("https://schema-XX.XX.com")
    s, err := schemaRegistryClient.GetSchema(6)
     if err != nil {
        panic(err)
     }
     fmt.Printf("schema:\n%+v\n\n", s.Schema())
}

Schema in this is a string (i mean s.Schema()). I need to construct a postgres statement from this schema. Parsing this scheme is a big pain. Do we have some golang Debezium structs for the debezium schema which i can unmarshal to...

Unknown Revision Error

@riferrei I'm assigning and tagging you as it is possible for it to be urgent.

I came across srclient version number not showing correctly in go.mod files, so I re-released a v0.2.0 with a trailing .0 but then I got

go: downloading github.com/riferrei/srclient v0.2.0
go get github.com/riferrei/srclient: github.com/riferrei/srclient@v0.2.0: verifying module: github.com/riferrei/srclient@v0.2.0: reading https://sum.golang.org/lookup/github.com/riferrei/[email protected]: 410 Gone
        server response: not found: github.com/riferrei/srclient@v0.2.0: invalid version: unknown revision v0.2.0

Error that blocks me from importing the library. I asked other people to try the link, some got the error, some didn't. So I don't know how severe this error is. I suspect it has to do with re-using the version number. So I'll do a small commit and release v0.2.1 and try that way. Will update the results here, but in the meantime I'm going to kindly ask you to remove v0.2.0 release and tag.

Cheers and sorries.

UPDATE:

I can't reproduce the error, but released v0.2.1 nonetheless as the project needed a .gitignore :)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.