Giter Site home page Giter Site logo

yahoo / bandar-log Goto Github PK

View Code? Open in Web Editor NEW
21.0 7.0 18.0 946 KB

Monitoring tool to measure flow throughput of data sources and processing components that are part of Data Ingestion and ETL pipelines.

License: Apache License 2.0

Shell 0.14% Scala 99.86%
kafka presto vertica etl big-data monitoring spark-streaming athena scala

bandar-log's Introduction

Bandar-Log: Monitoring Tool

Build Status

Intro

Bandar-Log makes possible to monitor flow throughput of data sources and processing components that are part of Data Ingestion and ETL pipelines.

Typical ETL assumes having some processing logic between data sources which adds some delay i. e. "resistance", which should be measured. For example:

  • how many events Spark app process per minute and comparing to how many events come to Kafka topics
  • what's the size of unprocessed events in Kafka topics at this moment
  • how much time passed since the last aggregation processed

Collected metrics might be sent to specified monitoring service like Datadog or others. intro

Typical ETL pipelines accept incoming data and compraises a chain of processing components with certain flow geometry:

  1. There is incoming data with certain rate, we call it IN.

    Examples: Kafka's topics that accept incoming messages.

  2. There are ETL components that pull data from one data source and put it to another one. This consuming rate is called OUT.

    Examples: Spark Streaming, Hive/Presto aggregations that pull portion of data from one table and aggregate it to another one, replicators that mirror data from one data source to another one.

Bandarlog is a standalone service that tracks IN [incoming rate], OUT [consuming rate] between two and more data sources.

In addition, it allows to measure LAG which is defined as (LAG = IN - OUT).

Particular semantics of metrics IN, OUT, LAG depends on specific Data Sources and contracts that Bandarlog expects (see Metrics).

Why Bandar-Log?

  1. Easy to use. Create your own Bandar-Log in 10 minutes, just follow up with Start Bandar-Log in 3 steps section.
  2. Tested on the real-time big data pipelines for a long period of time. Bandar-Log proved itself like the most straightforward and stable component.
  3. Support. Bandar-Log is running right now. So, we care about its stability and new features.
  4. No need to modify any apps. Bandar-Log is a separated application which monitors metrics outside.
  5. Easy to extend and add custom data sources.

Getting Started

See How to Start Bandar-Log in 3 steps.

Bandar-Log concepts

  • Data source config
  • Bandarlog
  • Metric
  • Reporter

Data source config

Data source is an abstration over persistance component which can provide or store data.

In Bandar-Log data source is represented by configuration object called data source config.

Data source config specifies driver/connection properties like host, username, password etc... Data source config can be shared between multiple data sources.

Kafka data source

Kafka data source configuration:

kafka-config {                   # kafka configuration id (can be any id)
  brokers = 1.1.1.1:9092         # default list of brokers
}

example-bandarlog {
  connector = "kafka-config"     # reference to the particular kafka config
}

SQL data source

SQL data source configuration:

sql-data-source-config {             # data source configuration id (can be any id)
  host = "example.host.com"          # data source host
  port = "5433"                      # data source port
  dbname = "dbname"                  # data source database name
  username = "username"              # data source username
  password = "password"              # data source password 
  schema = "schema"                  # data source schema  
  use-ssl = true                     # data source SSL mode flag (by default = true)
  max.pool.size = 10                 # connection pool size (by default = 10) 
  connection.timeout.ms = 60000      # connection timeout in ms (by default = 60000)
}

aws-glue-source-config {
  region = "region"                  # aws region
  dbname = "database"                # database name
  access.key = "accesskey"           # access key provided by AWS account
  secret.key = "secretaccesskey"     # secret key provided by AWS account
  fetch.size = 10                    # the maximum number of partitions to return in a single response
  segment.total.number = 10          # the total number of segments - non-overlapping region of a table's partitions. Maximum possible value - 10
  maxwait.timeout.seconds = 60       # maximum wait time until all the parallel requests become completed
}

Supported data source versions:

  • Kafka (version >= 0.10.2)
  • SQL
    • Vertica (compatible with vertica driver 6.0.0)
    • Presto (compatible with presto driver 0.181)
    • AWS Glue Data Catalog (compatible and tested with aws-java-sdk-glue 1.11.388)
      * you can easily add new data source

Bandarlog

Bandarlog -- unit of data-flow monitoring for one data source or between several data sources.
Each bandarlog, depending on its type, has one or multiple connectors -- objects that reference to specific data source config. Bandarlog monitors flow between linked data sources.

For now there are two supported bandarlog types:

  • SQL -- to measure the performance of specific ETL component(s) which reads data from SQL-complient data source and writes data to SQL complient data source(s). In order to use Glue connector, one needs metadata table, for example, created by the AWS Glue crawlers. Crawlers connect to a source or target data store, determine the schema, automatically compute statistics and register partitions, and then create metadata table in the AWS Glue Data Catalog.
  • Kafka -- to measure performance of specific kafka consumer and incoming rate.

SQL connectors divided into IN and OUT connector types with one-to-many relation (we can have one IN connector and several OUT connectors):

IN - connector for input data source. IN metric will be fetched from it (input rate).
OUT - list of connectors for output data sources. OUT metrics will be fetched from it (output rate).

You can use IN or OUT connector separately according to your requirements (like in quick start examples).

SQL connector configuration:

in-connector {                               
  type = "presto"                   # data source type (vertica, presto)
  config-id = "presto-config"       # reference to the data source config id
  tag = "presto-tag-name"           # reporting tag value
}
out-connectors = [{                         
  type = "vertica"                  # data source type (vertica, presto)
  config-id = "vertica-config"      # reference to the data source config id
  tag = "vertica-tag-name"          # reporting tag value
}]

Bandar-Log App accepts list of bandarlog units that works in parallel. One Bandar-Log App instance can run all required stuff.

bandarlogs {               # bandarlogs list, every bandarlog should be inside it (can't be renamed)
    <bandarlog-1> {        # bandarlog unit (can be renamed to any name)
        ...
    }
    ...
    
    <bandarlog-n> {
        ...
    }
}

Bandarlogs are isolated therefore any connection/semantics issues affected one bandarlog won't affect others.

Each bandarlog has several mandatory properties:

  1. Enabled - flag to enable/disable bandarlog unit
enabled = true
  1. Bandarlog type - can be kafka or sql according to data source
bandarlog-type = "kafka"
  1. Column type - for sql bandarlog (see SQL section).
column-type = "timestamp"
  1. Data source & Connector - see Data sources section.

  2. Metrics - list of metrics which are should be calculated and reported (see Metric).

metrics = ["IN", "OUT", "LAG"]    
  1. Reporters - see Reporter section.

  2. Scheduler - specifies bandarlog execution time

scheduler {                                         
  delay.seconds = 0              # delay in seconds before bandarlog is to be executed
  scheduling.seconds = 60        # time in seconds between bandarlog executions
}
  1. Tables/Topics (according to the bandarlog type) - each metric will be calculated and reported for each table/topic from the list.

SQL tables:

tables = [                                                 # example of config for table when column-type = datetime 
  {
    in-table = "in_table_1"                                # table name for for the IN metric
    in-columns = ["year=yyyy", "month=MM", "day=dd"]       # <column>=<format> pairs for the IN metric
    out-table = "out_table_1"                              # table name for for the OUT metric
    out-columns = ["year=yyyy", "month=MM", "day=dd"]      # <column>=<format> pairs for the OUT metric
  },
  { 
    in-table = "in_table_n"
    in-columns = ["date=yyyy-MM-dd HH:mm:ss"] 
    out-table = "out_table_n"
    out-columns = ["date=yyyy-MM-dd HH:mm:ss"]
  }, 
  ...
]
tables = [                                                 # example of config for table when column-type = timestamp
  {
    in-table = "in_table_1"                                # table name for for the IN metric
    in-columns = ["in_column_1"]                           # column name for the IN metric
    out-table = "out_table_1"                              # table name for for the OUT metric
    out-columns = ["out_column_1"]                         # column name for the OUT metric
  }, 
  ...
]

Kafka topics:

topics = [                                        
  {
    topic-id = "topic_id"                     # user-friendly topic id, every metric will be tagged with this value
    topic = ["topic_1", "topic_2"]            # kafka topics
    group-id = "group_id"                     # kafka group id
  },
  ...
]

Metric

Bandar-Log measures three fundamental metrics IN, OUT, LAG whose semantics depends heavily on bandarlog-type (kafka, sql). Bandarlog object contains section metrics to specify either all of them or just required subset.

metrics = ["IN", "OUT", "LAG"]

Kafka

Note

Bandar-Log assumes that Kafka consumer component that require to me monitored, commit their offsets back to Kafka using Kafka API.

The following metrics are available for bandarlog with type kafka :

Metric Reporting metric name Value type Required params Description
IN *.in_messages incoming messages (long) topic Number of incoming messages across all topic partitions calculates as SUM of leader offsets ** for all topic partitions fetched from Kafka API (getLatestLeaderOffsets)
using topic from topics list
OUT *.out_messages consumed messages (long) topic, group-id Number of consumed messages across all topic partitions calculates as SUM of consumer offsets ** for all topic partitions fetched from Kafka API (getConsumerOffsets)
using topic and group-id from topics list
LAG *.lag unconsumed messages (long) topic, group-id Number of unconsumed messages, calculates as Sum(leader offsets - consumer offsets) ** per topic

* reporting prefix
** according to kafka architecture, offset is an order of messages

SQL

Note

Bandar-Log assumes:

  1. ETL components use dedicated column(s) to mark and isolate specific piece of processed data.
  2. Column can be of several types according to data type of partition column: timestamp or datetime.
  3. The semantics of timestamp column (here and futher called batch_id [name is configurable]) is Unixtime timestamp measured in milliseconds (UTC by definition) which determines a moment of time when piece of data has been processed.
  4. This column must be fetched using query SELECT MAX(batch_id) FROM :table.
  5. The semantics of datetime is Date/Timestamp (e.g., '2013-01-01' or '2015-04-09 14:07'). There can be several columns of type datetime. Along with the column name the appropriate format must be provided via config for parsing the partition values to date represented by milliseconds. The format is according to Date and Time Patterns in Java SimpleDateFormat.
  6. These columns must be fetched using query SELECT DISTINCT year, month, day FROM :table
  7. In case of the AWS Glue connector, a dedicated column (for example, batch_id) should be a partition column. Thus, having a metadata table generated by AWS Glue crawler, Glue client can extract maximum value for partition column without the need for scanning the whole table in AWS Athena.

(for presto with optimize_metadata_queries=true connection setting)

The following metrics are available for bandarlog with type sql :

Metric Reporting metric name Value type Required params Description
IN *.in_timestamp timestamp (long) in-table Timestamp fetched from in-connector data source using <table>:<column>
pair from in-table property
OUT *.out_timestamp timestamp (long) out-table Timestamp fetched from out-connectors data sources using <table>:<column>
pair from out-table property
LAG *.lag diff in milliseconds (long) in-table, out-table Difference between IN and OUT timestamps (LAG = IN - OUT)
REALTIME_LAG *.realtime_lag diff in milliseconds (long) out-table Difference between current timestamp in UTC and OUT timestamp (REALTIME_LAG =
System.currentTimeMillis() - OUT)

* reporting prefix

Reporter

Reporter API for the specific monitoring service like Datadog.

Reporters configuration:

report {                                           # each metric will be reported using these properties
  prefix = "vertica_metrics"                       # report prefix which should be used for reported metrics (kafka_metrics.in_messages..)
  interval.sec = 180                               # reporter running interval
}    

reporters = [                                      # list of reporters, where each metric should be reported
  {
    type = "datadog"                               # reporter type
    config-id = "datadog-config"                   # reference to reporter config
  }
]

Datadog

Currently, we are using Datadog reporter as a single reporter for bandarlog metrics.
Inside datadog reporter configuration you can specify host, metrics prefix and running interval for datadog reporter.
Also, we are using Datadog tags to keep metrics data aggregated, look at Reporting Tags section for more details.

configuration:

datadog-config {                      # datadog reporter config id
  host = null                         # use 'null' to use local datadog agent or specify host value  
}

Reporting Tags

Tags are a way of adding dimensions to metrics, so they can be sliced, aggregated, filtered.

Default Tags
Kafka
Metric Tags
IN topic:<topic-id>
OUT topic:<topic-id>, group-id:<group-id>
LAG topic:<topic-id>, group-id:<group-id>

* <topic-id> and <group-id> are placeholders for topic values from topics=[...] list inside kafka bandarlog config

SQL
Metric Tags
IN in_table:<in-table>, in_connector:<in-connector>
OUT out_table:<out-table>, out_connector:<out-connector>
LAG in_table:<in-table>, in_connector:<in-connector>, out_table:<out-table>, out_connector:<out-connector>
REALTIME_LAG out_table:<out-table>, out_connector:<out-connector>

* <in-table> and <out-table> are table placeholders from tables=[...] list inside sql bandarlog config.
<in-connector> and <out-connector> are placeholders from tag value inside in/out connector config.

Environment Tag

env:<environment>
<environment> value will be fetched from APP_ENVIRONMENT variable if it does not exist - the tag will not be reported.

Custom Tags

You can specify your own reporting tags, just add the following config to your bandarlog with a required key-value pair.

bandarlogs {
  example-bandarlog {
    ...
    tags = [
      {
        key = "<tag_name>"
        value = "<tag_value>"
      }
    ]
    ...
  }
}

Architecture overview

overview

  1. Data source
    Kafka, Vertica, Presto, AWS Glue Data Catalog.

  2. Connector
    API layer over data source.

  3. Metric Provider
    Metric Provider calls connector API to fetch data from data source and calculates specific business metrics based on it. Each metric has appropriate metric provider accordingly to bandarlog-type.
    For example IN metric for bandarlog-type = "kafka" will use KafkaInMessagesProvider but for bandarlog-type = "sql" it will use SqlTimestampProvider.

  4. Bandarlog manager
    Bandarlog is a monitoring unit for one or several data sources which is responsible for the managing data providers and reporters.

  5. Metric Reporter
    Reporter API for the specific monitoring service like Datadog.

  6. Monitor
    Metrics monitor in monitoring service.

Docker image

Published as bandarlog

Build custom image

publish bandarlog image to the local machine:

sbt bandarlog/docker:publishLocal

push image to docker registry:

docker login -u <docker_user> -p <docker_password> <docker_registry>
docker push <image>

License

Bandar-Log is released under the Apache License, Version 2.0

Credits

bandar-log's People

Contributors

alexey-lipodat avatar btrofimov avatar gemelen avatar maksymskrynnikov avatar mariiara avatar rossoha avatar tomershtrum avatar yurir avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

bandar-log's Issues

Rework Datadog reporter and metrics

We could reuse PagerDuty's metrics library and avoid reporter re-implementation.
This would provide more flexibility with metrics creation and reporting.

Untangle Kafka-related code from Spark

We use Kafka client and auxiliary classes from Spark, while it could be replaced with pure Kafka client. This would reduce dependency list a lot.
We need to use client that is able to connect to brokers no older that 0.10.0.2 version.

Bypass TRAVIS_BUILD_NUMBER if not set

It's hard to import project in IDE because of sbt project depends on TRAVIS_BUILD_NUMBER's value.
In case it's not set, import procedure fails.

Add bypass for this variable for development environment, like setting it to constant if environment variables is not set.

Rework queue metrics facility

At the moment, Bandarlog relies on semantics of Kafka, however it could be re-done with more generalized idea of observation of queue-like services.
Should be done to enable support of different providers of queue/log nature, eg Kinesis, SQS, etc.
Also, could possibly enable cross-queue monitoring.
Needs to be done before #18 and #19.

Access AWS MSK

We are trying to add bandarlog for MSK and facing issues. On first run we got

ERROR connectors.KafkaConnector: Cannot obtain leaders offsets for topic:[Set(input_test)], cause java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.

After restarting bandarlog we started to get

ERROR connectors.KafkaConnector: Cannot obtain leaders offsets for topic:[Set(input_test)], cause java.util.concurrent.TimeoutException: Futures timed out after [1 second]

MSK security group has an open port for bandarlog machine. Any tips in debugging would be helpful.

Thanks

OSSify bandar-log?

@yurir @zsaltys Hi, could you please clarify, if bandar-log could be made fully OSS project and be put under control of its current contributors?

Add Athena/Glue support

We need to monitor metrics on Athena, so Bandar-log needs Athena/Glue SQL data source support.
Keep in mind the method of obtaining max(batch_id) specific to AWS Glue to not do full scan of target table.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.