Giter Site home page Giter Site logo

trendyol / go-dcp-elasticsearch Goto Github PK

View Code? Open in Web Editor NEW
69.0 16.0 15.0 791 KB

The Go implementation of the Couchbase to Elasticsearch with DCP.

License: MIT License

Go 65.75% Dockerfile 21.37% Makefile 1.66% Shell 11.23%
couchbase dcp elasticsearch go golang elasticsearch-connect-couchbase

go-dcp-elasticsearch's Introduction

Go Dcp Elasticsearch

Go Reference Go Report Card

Go implementation of the Elasticsearch Connect Couchbase.

Go Dcp Elasticsearch streams documents from Couchbase Database Change Protocol (DCP) and writes to Elasticsearch index in near real-time.

Features

  • Less resource usage and higher throughput(see Benchmarks).
  • Custom routing support(see Example).
  • Update multiple documents for a DCP event(see Example).
  • Handling different DCP events such as expiration, deletion and mutation(see Example).
  • Elasticsearch compression request body support.
  • Managing batch configurations such as maximum batch size, batch bytes, batch ticker durations.
  • Scale up and down by custom membership algorithms(Couchbase, KubernetesHa, Kubernetes StatefulSet or Static, see examples).
  • Easily manageable configurations.

Benchmarks

The benchmark was made with the 1,001,006 Couchbase document, because it is possible to more clearly observe the difference in the batch structure between the two packages. Default configurations for Java Elasticsearch Connect Couchbase used for both connectors.

Package Time to Process Events Elasticsearch Indexing Rate(/s) Average CPU Usage(Core) Average Memory Usage
Go Dcp Elasticsearch(Go 1.20) 50s go 0.486 408MB
Java Elasticsearch Connect Couchbase(JDK15) 80s go 0.31 1091MB

Example

Struct Config

func mapper(event couchbase.Event) []document.ESActionDocument {
	if event.IsMutated {
		e := document.NewIndexAction(event.Key, event.Value, nil)
		return []document.ESActionDocument{e}
	}
	e := document.NewDeleteAction(event.Key, nil)
	return []document.ESActionDocument{e}
}

func main() {
	connector, err := dcpelasticsearch.NewConnectorBuilder(config.Config{
		Elasticsearch: config.Elasticsearch{
			CollectionIndexMapping: map[string]string{
				"_default": "indexname",
			},
			Urls: []string{"http://localhost:9200"},
		},
		Dcp: dcpConfig.Dcp{
			Username:   "user",
			Password:   "password",
			BucketName: "dcp-test",
			Hosts:      []string{"localhost:8091"},
			Dcp: dcpConfig.ExternalDcp{
				Group: dcpConfig.DCPGroup{
					Name: "groupName",
					Membership: dcpConfig.DCPGroupMembership{
						Type: "static",
					},
				},
			},
			Metadata: dcpConfig.Metadata{
				Config: map[string]string{
					"bucket":     "checkpoint-bucket-name",
					"scope":      "_default",
					"collection": "_default",
				},
				Type: "couchbase",
			},
		},
	}).
		SetMapper(mapper).
		Build()
	if err != nil {
		panic(err)
	}

	defer connector.Close()
	connector.Start()
}

File Config

Default Mapper

Configuration

Dcp Configuration

Check out on go-dcp

Elasticsearch Specific Configuration

Variable Type Required Default Description
elasticsearch.collectionIndexMapping map[string]string yes Defines which Couchbase collection events will be written to which index
elasticsearch.urls []string yes Elasticsearch connection urls
elasticsearch.typeName string no Defines Elasticsearch index type name
elasticsearch.batchSizeLimit int no 1000 Maximum message count for batch, if exceed flush will be triggered.
elasticsearch.batchTickerDuration time.Duration no 10s Batch is being flushed automatically at specific time intervals for long waiting messages in batch.
elasticsearch.batchByteSizeLimit int, string no 10mb Maximum size(byte) for batch, if exceed flush will be triggered. 10mb is default.
elasticsearch.maxConnsPerHost int no 512 Maximum number of connections per each host which may be established
elasticsearch.maxIdleConnDuration time.Duration no 10s Idle keep-alive connections are closed after this duration.
elasticsearch.compressionEnabled boolean no false Compression can be used if message size is large, CPU usage may be affected.
elasticsearch.concurrentRequest int no 1 Concurrent bulk request count
elasticsearch.disableDiscoverNodesOnStart boolean no false Disable discover nodes when initializing the client.
elasticsearch.discoverNodesInterval time.Duration no 5m Discover nodes periodically

Exposed metrics

Metric Name Description Labels Value Type
elasticsearch_connector_latency_ms Time to adding to the batch. N/A Gauge
elasticsearch_connector_bulk_request_process_latency_ms Time to process bulk request. N/A Gauge

You can also use all DCP-related metrics explained here. All DCP-related metrics are automatically injected. It means you don't need to do anything.

Contributing

Go Dcp Elasticsearch is always open for direct contributions. For more information please check our Contribution Guideline document.

License

Released under the MIT License.

go-dcp-elasticsearch's People

Contributors

abdulsametileri avatar ademekici avatar alihanyalcin avatar burhanelgun avatar canerpatir avatar emrekosen avatar emretanriverdi avatar erayarslan avatar erdincozdemir avatar erkanerkisi avatar henesgokdag avatar mhmtszr avatar oguzyildirim avatar ramazan avatar uatmaca avatar ykursadkaya avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

go-dcp-elasticsearch's Issues

Expose DiscoverNodesOnStart and DiscoverNodesInterval

If we set less node urls than existing, our request will be always forwarded to these nodes. We want to use other nodes too.

We need to configure DiscoverNodesOnStart and DiscoverNodesInterval configs, we just need to add these configs to elasticsearch.NewClient function.

Support for partial index update

We started migrating from CBES to go-dcp-elasticsearch. We're stuck on projects where we need to do a partial index update.

Do you have any plan to provide partial index update support? We are also open to contributing to developing this feature with your support if this feature also makes sense to you.

bug: latency metric stuck

go-dcp version: v1.1.9
golang: 1.20

We use the exposed metric "cbgo_elasticsearch_connector_bulk_request_process_latency_ms_current" to measure the delay.Sometimes, one of the pods gets stuck. It's resolved when you restart the pod.

image

discussion: Lets discuss about if actions are nil, we can ack or not?

actions := c.mapper(e)

for i := range actions {
    c.bulk.AddAction(ctx, e.EventTime, actions[i], e.CollectionName)
}

In this code, we are using ctx.Ack() only actions are not nil. When actions are nil or length zero, we cannot Ack(). Because of this, we are re-processing bad malformed, or invalid documents again, we can change this implementation. Let's discuss

Do not terminate application on write exception

Is your feature request related to a problem? Please describe.
The application terminates when we receive an error while writing to the target data source.

Describe the solution you'd like
At this stage, the action to be taken when an error occurs can be defined as a function.
.SetExceptionCallbackFunction()

feature: expose elasticsearch action results as metric

we need to expose elasticsearch actions metrics to prometheus endpoint.

example metrics:
es_action_success_total
es_action_error_total
es_action_total

or

es_action_total{result="success/error", action="delete/index"}

Do not use any config library get Config struct as a parameter

Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

bug :wrong flush message condition

Batch flushMessage byte limit is checked incorrectly. len(b.batch) >= b.batchByteSizeLimit is calculating size of batch list length.

file : bulk.go

if b.batchSize >= b.batchSizeLimit || len(b.batch) >= b.batchByteSizeLimit {
	b.flushMessages()
}

CompressRequestBody Config

Is your feature request related to a problem? Please describe.
We need to provide CompressRequestBody config to users.

CB Document Key to JSON encoding problem

Description
When CB key includes special character like '"', Elasticsearch Bulk API returns HTTP 400 because of JSON format issue.

Example DCP Event
{"delete":{"_index":"report-task","_id":"testqweq1""}}

Expected behavior
JSON have to be validated or escaped before request send to Elasticsearch Bulk API

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.