Giter Site home page Giter Site logo

kevwan / go-stash Goto Github PK

View Code? Open in Web Editor NEW
1.1K 22.0 145.0 379 KB

go-stash is a high performance, free and open source server-side data processing pipeline that ingests data from Kafka, processes it, and then sends it to ElasticSearch.

Home Page: https://go-zero.dev

License: MIT License

Go 96.36% Dockerfile 3.64%
logstash elk kafka elasticsearch

go-stash's Introduction

English | 简体中文

go-stash

go-stash is a high performance, free and open source server-side data processing pipeline that ingests data from Kafka, processes it, and then sends it to ElasticSearch.

go-stash is about 5x throughput more than logstash, and easy to deploy, only one executable file.

go-stash

Quick Start

Install

cd stash && go build stash.go

Quick Start

  • With binary
./stash -f etc/config.yaml
  • With docker, make sure the path of config file is correct.
docker run -d -v `pwd`/etc:/app/etc kevinwan/go-stash

The config.yaml example is as follows:

Clusters:
- Input:
    Kafka:
      Name: go-stash
      Log:
        Mode: file
      Brokers:
      - "172.16.48.41:9092"
      - "172.16.48.42:9092"
      - "172.16.48.43:9092"
      Topic: ngapplog
      Group: stash
      Conns: 3
      Consumers: 10
      Processors: 60
      MinBytes: 1048576
      MaxBytes: 10485760
      Offset: first
  Filters:
  - Action: drop
    Conditions:
      - Key: status
        Value: 503
        Type: contains
      - Key: type
        Value: "app"
        Type: match
        Op: and
  - Action: remove_field
    Fields:
    - message
    - source
    - beat
    - fields
    - input_type
    - offset
    - "@version"
    - _score
    - _type
    - clientip
    - http_host
    - request_time
  Output:
    ElasticSearch:
      Hosts:
      - "http://172.16.188.73:9200"
      - "http://172.16.188.74:9200"
      - "http://172.16.188.75:9200"
      Index: "go-stash-{{yyyy.MM.dd}}"
      MaxChunkBytes: 5242880
      GracePeriod: 10s
      Compress: false
      TimeZone: UTC

Details

input

Conns: 3
Consumers: 10
Processors: 60
MinBytes: 1048576
MaxBytes: 10485760
Offset: first

Conns

  • The number of links to kafka, the number of links is based on the number of cores of the CPU, usually <= the number of cores of the CPU.

Consumers

  • The number of open threads per connection, the calculation rule is Conns * Consumers, not recommended to exceed the total number of slices, for example, if the topic slice is 30, Conns * Consumers <= 30

Processors

  • The number of threads to process data, depending on the number of CPU cores, can be increased appropriately, the recommended configuration: Conns * Consumers * 2 or Conns * Consumers * 3, for example: 60 or 90

MinBytes MaxBytes

  • The default size of the data block from kafka is 1M~10M. If the network and IO are better, you can adjust it higher.

Offset

  • Optional last and first, the default is last, which means read data from kafka from the beginning

Filters

- Action: drop
  Conditions:
    - Key: k8s_container_name
      Value: "-rpc"
      Type: contains
    - Key: level
      Value: info
      Type: match
      Op: and
- Action: remove_field
  Fields:
    - message
    - _source
    - _type
    - _score
    - _id
    - "@version"
    - topic
    - index
    - beat
    - docker_container
    - offset
    - prospector
    - source
    - stream
- Action: transfer
  Field: message
  Target: data

- Action: drop

  • Delete flag: The data that meets this condition will be removed when processing and will not be entered into es
  • According to the delete condition, specify the value of the key field and Value, the Type field can be contains (contains) or match (match)
  • Splice condition Op: and, can also write or

- Action: remove_field

Remove_field_id: the field to be removed, just list it below

- Action: transfer

Transfer field identifier: for example, the message field can be redefined as a data field

Output

Index

  • Index name, indexname-{{yyyy.MM.dd}} for year. Month. Day, or {{yyyy-MM-dd}}, in your own format

MaxChunkBytes

  • The size of the bulk submitted to ES each time, default is 5M, can be adjusted according to the ES io situation.

GracePeriod

  • The default is 10s, which is used to process the remaining consumption and data within 10s after the program closes and exits gracefully

Compress

  • Data compression, compression will reduce the amount of data transferred, but will increase certain processing performance, optional value true/false, default is false

TimeZone

  • Default value is UTC, Universal Standard Time

ES performance write test

Test environment

  • stash server: 3 units 4 cores 8G
  • es server: 15 units 16 cores 64G

Key configuration

- Input:
      Conns: 3
      Consumers: 10
      Processors: 60
      MinBytes: 1048576
      MaxBytes: 10485760
  Filters:
  - Action: remove_field
    Fields:
    - Message
    - source
    - beat
    - fields
    - input_type
    - offset
    - request_time
  Output:
      Index: "nginx_pro-{{yyyy.MM.d}}"
      Compress: false
      MaxChunkBytes: 5242880
      TimeZone: UTC

Write speed is above 150k/s on average

go-stash

Acknowledgements

go-stash is powered by go-zero GitHub Repo stars GitHub forks for great performance!

Give a Star! ⭐

If you like or are using this project to learn or start your solution, please give it a star. Thanks!

go-stash's People

Contributors

dependabot[bot] avatar kevwan avatar mikaelemmmm avatar sjatsh avatar sp3c73r2038 avatar wzlove avatar zmzblink avatar zzy5066 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

go-stash's Issues

报错

{"@timestamp":"2023-03-04T15:00:31.601+08","level":"error","content":"index.go:65 elastic: Error 400 (Bad Request): Validation Failed: 1: this action would add [2] shards, but this cluster currently has [1000]/[1000] maximum normal shards open; [type=validation_exception]\nelastic: Error 400 (Bad Request): Validation Failed: 1: this action would add [2] shards, but this cluster currently has [1000]/[1000] maximum normal shards open; [type=validation_exception]\nelastic: Error 400 (Bad Request): Validation Failed: 1: this action would add [2] shards, but this cluster currently has [1000]/[1000] maximum normal shards open; [type=validation_exception]"}

{"@timestamp":"2023-03-04T15:01:12.190+08","level":"error","content":"writer.go:74 \u0026{validation_exception Validation Failed: 1: this action would add [2] shards, but this cluster currently has [1000]/[1000] maximum normal shards open; false map[] [] [] [] map[] [] \u003cnil\u003e}"}

What's it support ElasticSearch version?

What's it support ElasticSearch version?

You know when the elastic release new ElasticSearch version ,the ElasticSearch REST API will change .
So , we should tall our client the go-stash supported ElasticSearch version , eg: 7.1 ,6.5, 5.5. : )

写入elasticsearch 8.2.2提示这个

{"@timestamp":"2022-06-09T17:33:05.261+08:00","level":"error","content":"writer.go:60 elastic: Error 400 (Bad Request): Action/metadata line [1] contains an unknown parameter [_type] [type=illegal_argument_exception]"}

is https supported?

The output config is

Output:
    ElasticSearch:
      Hosts:
        - https://127.0.0.1:9200
      Index: "{.event}-{{yyyy-MM-dd}}"
      Username: elastic
      Password: XXXXXXYYYYYY

and i get

➜  stash git:(master) ✗ ./stash -f etc/config.yaml                                 
2022/04/17 01:28:31 stash.go:58 health check timeout: Head "https://127.0.0.1:9200": x509: “Elasticsearch security auto-configuration HTTP CA” certificate is not trusted: no Elasticsearch node available
2022/04/17 01:28:31 {"@timestamp":"2022-04-17T01:28:31.581+08:00","level":"fatal","content":"stash.go:58 health check timeout: Head \"https://127.0.0.1:9200\": x509: “Elasticsearch security auto-configuration HTTP CA” certificate is not trusted: no Elasticsearch node available"}

the official guide is

➜  es curl --cacert http_ca.crt -u elastic https://127.0.0.1:9200            
Enter host password for user 'elastic':
{
  "name" : "34bb1b121a60",
  "cluster_name" : "docker-cluster",
  "cluster_uuid" : "Y4pyYI5rSg-JzEVphE8ZqA",
  "version" : {
    "number" : "8.1.2",
    "build_flavor" : "default",
    "build_type" : "docker",
    "build_hash" : "31df9689e80bad366ac20176aa7f2371ea5eb4c1",
    "build_date" : "2022-03-29T21:18:59.991429448Z",
    "build_snapshot" : false,
    "lucene_version" : "9.0.0",
    "minimum_wire_compatibility_version" : "7.17.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "You Know, for Search"
}

The ca cert which named http_ca.crt is copied from the docker container.

for go-stash is a competitor to logstash, i find filebeat and logstash's configuration from kibana.

output.elasticsearch:
  hosts: ["<es_url>"]
  username: "elastic"
  password: "<password>"
  # If using Elasticsearch's default certificate 
  ssl.ca_trusted_fingerprint: "<es cert fingerprint>" 
setup.kibana:
  host: "<kibana_url>"

there is a property named "ssl.ca_trusted_fingerprint", maybe helpful ?

illegal_argument_exception ? parameter [_type] [type

{"@timestamp":"2024-06-14T15:32:45.819+08:00","caller":"es/writer.go:64","content":"elastic: Error 400 (Bad Request): Action/metadata line [1] contains an unknown parameter [_type] [type=illegal_argument_exception]","level":"error"}
{"@timestamp":"2024-06-14T15:32:46.821+08:00","caller":"es/writer.go:64","content":"elastic: Error 400 (Bad Request): Action/metadata line [1] contains an unknown parameter [_type] [type=illegal_argument_exception]","level":"error"}
{"@timestamp":"2024-06-14T15:32:47.820+08:00","caller":"es/writer.go:64","content":"elastic: Error 400 (Bad Request): Action/metadata line [1] contains an unknown parameter [_type] [type=illegal_argument_exception]","level":"error"}
{"@timestamp":"2024-06-14T15:32:48.822+08:00","caller":"es/writer.go:64","content":"elastic: Error 400 (Bad Request): Action/metadata line [1] contains an unknown parameter [_type] [type=illegal_argument_exception]","level":"error"}
{"@timestamp":"2024-06-14T15:32:49.825+08:00","caller":"es/writer.go:64","content":"elastic: Error 400 (Bad Request): Action/metadata line [1] contains an unknown parameter [_type] [type=illegal_argument_exception]","level":"error"}
{"@timestamp":"2024-06-14T15:32:50.819+08:00","caller":"es/writer.go:64","content":"elastic: Error 400 (Bad Request): Action/metadata line [1] contains an unknown parameter [_type] [type=illegal_argument_exception]","level":"error"}

写入目标es时是否有限流功能

感谢万老师开源的go-stash,有个问题请教写入目标es时是否有限流功能,我看应该是没有的。
在线迁移的时候目标es其实也在提供服务功能,如果写入流量太多可能会影响其他业务,所以请问如何做到一些限流或者说动态调整写入频率的功能呢?有相关解决方案介绍吗?谢谢。

docker image can't use

WARNING: The requested image's platform (linux/arm64/v8) does not match the detected host platform (linux/amd64) and no specific platform was requested

写入目标es出错的问题

感谢万老师开源的go-stash,有个问题请教写入目标es时是否对出错进行了处理,我看应该是没有的,只是打印了错误日志。
迁移的时候往目标es写入数据,有可能会报错的,所以请问比较好的做法是什么呢?有相关解决方案介绍吗?谢谢。

同学,您这个项目引入了251个开源组件,存在1个漏洞,辛苦升级一下

检测到 kevwan/go-stash 一共引入了251个开源组件,存在1个漏洞

漏洞标题:jwt-go 安全漏洞
缺陷组件:github.com/dgrijalva/[email protected]+incompatible
漏洞编号:CVE-2020-26160
漏洞描述:jwt-go是个人开发者的一个Go语言的JWT实现。
jwt-go 4.0.0-preview1之前版本存在安全漏洞。攻击者可利用该漏洞在使用[]string{} for m[\"aud\"](规范允许)的情况下绕过预期的访问限制。
影响范围:(∞, 4.0.0-preview1)
最小修复版本:4.0.0-preview1
缺陷组件引入路径:main@->github.com/dgrijalva/[email protected]+incompatible

另外还有几个漏洞,详细报告:https://mofeisec.com/jr?p=a5495b

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.