Giter Site home page Giter Site logo

raystack / dagger Goto Github PK

View Code? Open in Web Editor NEW
262.0 262.0 41.0 12.28 MB

Dagger is an easy-to-use, configuration over code, cloud-native framework built on top of Apache Flink for stateful processing of real-time streaming data.

Home Page: https://raystack.github.io/dagger/

License: Apache License 2.0

Java 99.82% Python 0.04% Shell 0.14%
apache-flink apache-kafka dataops framework influxdb prometheus real-time-analytics real-time-processing stream-processing

dagger's People

Contributors

ankittw avatar anukin avatar arujit avatar batrov avatar deepakmarathe avatar dmitrysmorodinnikov avatar gauravsinghania avatar guykhazma avatar h4rikris avatar jesrypandawa avatar karrtikiyer-tw avatar kevinbheda avatar kevinbhedag avatar kn-sumanth avatar lavkesh avatar lewisxin avatar mayankrai09 avatar mayurgubrele avatar meghajit avatar nncrawler avatar prakharmathur82 avatar rajarammallya avatar ravisuhag avatar rohilsurana avatar seahleyi avatar shreyansh228 avatar sravankorumilli avatar sumitaich1998 avatar tygrash avatar vianhazman avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dagger's Issues

Add support for Avro data

Currently, Dagger only supports SerDe for Protobuf encoded messages.

  • Avro SerDe is already supported in Flink. Add support for enabling it.
  • Apart from SerDe, postprocessors, preprocessors, some functions like ElementAt, HashTransformer also rely on Dynamic Message APIs. Need to ensure they work for Avro data as well.

feat: support composite type in maps during serialization

Summary
As of now, Dagger doesn’t have the support just yet to serialize composite types in map values to kafka. Both the key and value are casted to String. This is the relevent piece of code which is doing this at the moment:

   builder.addRepeatedField(fieldDescriptor,
                    mapEntry.toBuilder()
                            .setKey((String) inputRow.getField(0))
                            .setValue((String) inputRow.getField(1))
                            .buildPartial());

This card is to support serialization of complex map values into the appropriate wire format to kafka.

Uniform version across modules

  • Have consistency in versioning across submodules.
  • Take versions from an external file instead of defining in build.gradle.

feat: Add support for kafka consumer

Context:
As we primarily work with protobuf data format, we have custom source code for Kafka. With 1.14 we migrated from Kafka consumer to source contract in flink. The source contract works fine for most of the cases, however, we noticed for high throughput topics the jobs don't work fine.

Acceptance Criteria:

  • Add support for kafka consumer
  • Enable kafka consumer by default
  • Make kafka consumer and kafka source configurable

feat: Parquet DataSource should provide ability to read multiple GCS buckets for creating multiple streams

As part of this issue, want to add support for handling multiple streams for Parquet Data Source.
That is, users should be able to specify multiple GCS URLs. Dagger should create a parquet data source, and hence a data stream for each of these GCS URLs.

This issue is needed so that the user can do joins and other operations with multiple streams on Parquet DataSource similar to KafkaSource.

feat: Handle complex/nested data types from a parquet file in Parquet Data Source: Struct, Repeated Struct, Maps and Timestamp of type SimpleGroup

Dagger has been processing real-time Kafka streams for years now, And now with parquet file processing, we aim to add the capability of performing dagger operations over the historical data, making Dagger a complete solution for data processing from historical to real-time.

As part of this feature, we want to extend #99 and add the capability to read **maps** as well as **timestamp in the format of nested simple groups**(seconds + nanos) from the parquet file as well.

All the current features of Dagger like transformers, UDFs, continue to work on the data. From the perspective of downstream components, they need not know what kind of source produced this data.

ACCEPTANCE CRITERIA:

GIVEN WHEN THEN
Dagger job is created Data source is selected as parquet One or more parquet file is provided as inputParquet file has a parent simple group which contains one or more fields of type struct Dagger should process the data from the local parquet file instead and then exit gracefully.All the struct fields should have their value set to null in the Flink row. ( NO PROCESSING CAN BE DONE FOR STRUCTS AS OF NOW)
Dagger job is created Data source is selected as parquet One or more parquet file is provided as inputParquet file has a parent simple group which contains one or more fields of type repeated struct Dagger should process the data from the local parquet file instead and then exit gracefully.All the repeated struct fields should have their value set to null in the Flink row. ( NO PROCESSING CAN BE DONE FOR REPEATED STRUCTS AS OF NOW)
Dagger job is created Data source is selected as parquet One or more parquet file is provided as inputParquet file has a parent simple group which contains one or more fields of type MAP Dagger should process the data from the local parquet file instead and then exit gracefully.Map types should be able to get parsed into array of key-value flink rows.Suitable default value should be used when data is not present in parquet file but present in schema.
Dagger job is created Data source is selected as parquet One or more parquet file is provided as inputParquet file has a parent simple group which contains one or more timestamp fields in the form of simple group of seconds + nanos.(This is different from issue #99 as there timestamp parsing was supported only when it was in int64/long format) Dagger should process the data from the local parquet file instead and then exit gracefully.The timestamp should be able to get parsed into a flink row of seconds and nanos.Suitable default value should be used when data is not present in parquet file but present in schema.

Out of scope:

  • Enums, repeated enums

  • Message, repeated message

  • Repeated primitives

Dagger test case are failing in EsResponseHandlerTest while setting it up locally ./gradlew clean build

io.odpf.dagger.core.processors.external.es.EsResponseHandlerTest > shouldCompleteResultFutureWithInputForPrimitiveData FAILED
io.odpf.dagger.core.processors.external.es.EsResponseHandlerTest > shouldCompleteResultFutureExceptionallyWhenPathDoesNotExists FAILED
io.odpf.dagger.core.processors.external.es.EsResponseHandlerTest > shouldHandleParseExceptionAndReturnInput FAILED
io.odpf.dagger.core.processors.external.es.EsResponseHandlerTest > shouldNotPopulateResultAsObjectIfTypeIsNotPassedAndRetainResponseTypeIsFalse FAILED
io.odpf.dagger.core.processors.external.es.EsResponseHandlerTest > shouldCompleteResultFutureWithInput FAILED
io.odpf.dagger.core.processors.external.es.EsResponseHandlerTest > shouldHandleIOExceptionAndReturnInput FAILED
io.odpf.dagger.core.processors.external.es.EsResponseHandlerTest > shouldHandleExceptionAndReturnInput FAILED
io.odpf.dagger.core.processors.external.es.EsResponseHandlerTest > shouldCompleteResultFutureWithInputAsObjectIfTypeIsNotPassedAndRetainResponseTypeIsTrue FAILED
io.odpf.dagger.core.processors.external.es.EsResponseHandlerTest > shouldHandleResponseParsingIOExceptionAndReturnInput FAILED

Handle null fields in api payload

Currently, there are no null checks for required params coming from the run job API payload.

Add support to check and handle them with proper error messages.

feat: Conglomerate and update dagger dependencies

Summary
As part of the many ongoing features such as parquet, python-udf, etc in Dagger, there will be additions in the dependencies. Hence, the intention is to combine all such planned dependencies and add them to the respective modulebuild.gradle files of Dagger so that the dependencies jar (of the format dagger-core-x.x.x-dependencies.jar) as generated by the gradle task ./gradlew dependenciesJar has all the dependencies packaged together.

Proposed solution
This will reduce migration effort later on when all the feature branches will get merged. Only the main dagger image will need to get changed.

Additional context
N/A

Support for Distributed tracing

Add support for Distributed tracing.

Trace should be able to correlate the input and output messages in the case of Data aggregation as well.

analyze: Support for python UDFs

Analyze the approach to support python UDFs.

Acceptance Criteria:

  • RFC detailing the approach and the deployment to be followed.

feat: Add ability to read and process complex/nested data types from a parquet file in Parquet Data Source: enums, repeated enums, message, repeated message, repeated primitives

Dagger has been processing real-time Kafka streams for years now, And now with parquet file processing, we aim to add the capability of performing dagger operations over the historical data, making Dagger a complete solution for data processing from historical to real-time.

As part of this feature, we want to extend #99 and add the capability to read repeated primitive types: repeated primitives, repeated enums and repeated simple groups and some complex types: enums and nested simple groups.

All the current features of Dagger like transformers, UDFs, continue to work on the data. From the perspective of downstream components, they need not know what kind of source produced this data.

ACCEPTANCE CRITERIA

GIVEN WHEN THEN
Dagger job is created Data source is selected as parquet One or more parquet file is provided as inputParquet file has a parent simple group which contains one or more enum fields Dagger should process the data from the local parquet file instead and then exit gracefully.The enum fields should be able to get added into a Flink row.Suitable default value should be used when data is not present in parquet file but present in schema.
Dagger job is created Data source is selected as parquet One or more parquet file is provided as inputParquet file has a parent simple group which contains one or more repeated enum fields ( array of enums) Dagger should process the data from the local parquet file instead and then exit gracefully.Each list of enums should be able to get added into a Flink row as a list.Suitable default value should be used when data is not present in parquet file but present in schema.
Dagger job is created Data source is selected as parquet One or more parquet file is provided as inputParquet file has a parent simple group which contains one or more nested simple groups(i:e, simple group within another simple group) Dagger should process the data from the local parquet file instead and then exit gracefully.The nested simple groups should be able to get parsed into nested flink rows.Suitable default value should be used when data is not present in parquet file but present in schema.
Dagger job is created Data source is selected as parquet One or more parquet file is provided as inputParquet file has a parent simple group which contains one or more fields of type repeated simplegroups simple groups(i:e, array of simple groups) Dagger should process the data from the local parquet file instead and then exit gracefully.The repeated simplegroups should be able to get parsed into array of flink rows.Suitable default value should be used when data is not present in parquet file but present in schema.
Dagger job is created Data source is selected as parquet One or more parquet file is provided as inputParquet file has a parent simple group which contains one or more fields of type repeated primitives (i:e, array of parquet primitive types like int64, boolean, etc) Dagger should process the data from the local parquet file instead and then exit gracefully.Repeated primitive types should be able to get parsed into array of flink rows.Suitable default value should be used when data is not present in parquet file but present in schema.

Out of scope

  • Struct

  • Repeated Struct

  • Maps

  • Timestamp of type SimpleGroup

Add CDC source

Dagger currently only supports Apache Kafka as a data source. Add support for adding change data capture (CDC) connector as a source for Dagger.

Find more details about Flink's CDC connectors here.

Log column names in case of Kafka and Influx sink

Currently, both Kafka and Influx sink logs only the data(Row) that is being sent.

Add support for logging column names as well along with data points similar to the implementation in log sink.

This will enable users to correlate the data points with column names.

Support for Dynamic Headers in HTTP Post Processor

Currently, in HTTP Post Processor, users can only specify static headers in the header config.

With this, we need to add the capability to support any header request pattern which can take values from input messages from the source.

Proposed sample configuration:

{
  "header_pattern": "{\"Header_Key\": \"%s\"}",
  "header_variables": "transaction_id"
}

feat: Add metrics for Parquet DataSource

For a dagger running with Parquet Source, some additional metrics might be needed in order to gauge the performance of the source, file reader behaviour, checkpointing behaviour as well as the split assigner behaviour( files added, files skipped). Also, for the existing metrics, if any changes are required so as to be consistent across the multiple sources, then that needs to be taken care of as well.

feat: exclude httpclient dependency from depot in minimalJar

Summary
Currently, for Parquet Dagger Source, depot has been added to minimalJar. Depot comes with its own transitive dependencies on org.apache.httpcomponents:httpclient with version 4.5.13. However, dagger-functions module uses org.elasticsearch.client:elasticsearch-rest-client:6.6.1 which has a transitive dependency on org.apache.httpcomponents:httpclient with version 4.5.6. This causes the classloader in Flink jobmanager to get confused at runtime when the job is submitted, as there are two different modules using 2 versions of the same dependency. It throws a LinkageError.

Proposed solution
To exclude org.apache.httpcomponents:httpclient dependency from depot in build.gradle of dagger-core module.

Additional context
This is the error that comes
Screenshot 2022-07-19 at 3 27 50 PM

docs: add documentation for BQ sink in Dagger

Summary

Documentation for BQ sink in Dagger

Proposed solution

This will help users in setting up the sink as well as debug issues

Additional context

  1. Add config document in dagger

  2. Add new sink document

  3. Update this sink information across the documentation

Add support for Struct SerDe

Dagger currently doesn’t support SerDe for Struct and repeated Struct data types.

Current behaviour: It sets the struct fields as null in case of deserialisation and sets empty Object for serialisation.
Expected behaviour: It should properly parse the fields so that they can be used in Dagger SQL query and can be sent to the sink post serialisation.

Add Count Window Transformer

Flink's SQL interface currently doesn't support Count Windows.

Add a transformer that should be able to do generic Count Window Aggregations.

feat: Analyse different strategies and add validation for missing fields in parquet data compared to protobuf schema

Extra fields present in the parquet data not present in the protobuf schema will be ignored.
However, it might be possible that:

  • there are some fields in protobuf schema which are missing in the parquet data
  • field names are same but the data type is different

We would need answers to as well as solve for :

  1. Should the Parquet Data Source set default values for fields which are not found in the parquet file but present in the schema ? If yes, what should be the default value ?
  2. If no defaults are wanted to be set, should the Dagger job fail ?

feat: enhance dagger functions to support python UDF

Acceptance Criteria:

  • Add CI to publish compressed python files into github packages

  • Create a dagger-py-functions directory at odpf/dagger root, following the directory structure mentioned in RFC

  • zip the entire directory including data, requirements.txt, and publish to Github packages

  • Actual python udf(in scope: One sample UDF)

Use odpf stencil client

Dagger currently uses com.gojek.stencil, which is now being deprecated. Change it to use 'io.odpf.stencil from here.

Handle missing configs, extra whitespaces or incorrect config values for Parquet Source

As of now, in case the stream config values for Parquet Data Source such as SOURCE_PARQUET_FILE_PATHS or SOURCE_PARQUET_SCHEMA_MATCH_STRATEGY are defined with extra whitespace in their values or invalid values, a NullPointer exception is thrown.

We should ideally throw a generic exception which explicitly tells that the config value is invalid. In case of whitespaces, we should trim.

Implement BQ sink in Dagger using ODPF sink

Acceptance Criteria

  • Add Basic BigQuery Sink using the sink-connector APIs.

  • Configure Proto/JSON Serialiser to convert Row into OdpfMessage

  • Call sink APIs to push messages to BQ.

Out of scope:

  • No batching

  • No checkpointing.

Deprecate billing project config from Parquet Stream Source Config

The STREAMS configuration currently has a nested field called as SOURCE_PARQUET_BILLING_PROJECT. This was added as part of #99

This was planned to be used to specify the GCP billing project id for a requester pays bucket.

However, since Dagger will ship with a default local core_site.xml, the billing project can be specified there itself under the config fs.gs.requester.pays.project.id. For production deployments, the core_site.xml can be injected externally using Terraform.

This card is to deprecate the config.

refactor: rename packages and files in dagger-common serde

Dagger will soon have a new data source for Parquet files. For deserialization of records, it will continue to use dagger-common module and the handler classes within.

Currently, the package names have keywords like proto and protohandler which are kind of misleading as the same handler classes will have methods to do parsing for both Protobuf objects as well as SimpleGroup objects.

Hence, there is a need to refactor some of these package names and classes inside the dagger-common module to better reflect its responsibility.

This issue is meant to do that. No functional change is required. This will only be a refactor.

feat: Create a Parquet Data Source with ability to read and process primitive data types from a parquet file

Dagger has been processing real-time Kafka streams for years now, And now with parquet file processing, we aim to add the capability of performing dagger operations over the historical data, making Dagger a complete solution for data processing from historical to real-time.

As part of this feature, we want to add a DataSource in Dagger which can read data from a parquet file and send records downstream as Flink Row. We only want to target reading of simple types like INT, FLOATS or STRINGS, etc from the Parquet File in this issue. Reading of nested fields is not under scope for this issue but will be covered by #100

All the current features of Dagger like transformers, UDFs, continue to work on the data generated by Parquet Data Source. In fact, from the perspective of downstream components, they need not know what kind of source produced this data.

Tasks to be done:

  1. Create a Parquet Data Source and expose configurations. Data Sources should be switchable in Dagger.

  2. Create a Parquet Reader which reads parquet files using row groups and columns.

  3. Create parser for parsing Parquet primitive types → Flink Row types.

  4. Process the parquet files in chronological order

Not in Scope

  1. Checkpointing and state persistence: when the parquet source dagger is restarted, it should start processing from last checkpoint
  2. Corrupt file behaviour

Acceptance Criteria

Acceptance Criteria:

GIVEN WHEN THEN
Dagger job is created Data source is selected as parquet A single local parquet file is provided as input Dagger should process the data from the local parquet file instead and then exit gracefully.Only primitive types will be parsed into their equivalent Java types. Other complex types can be empty.Any int64 timestamp fields in the parquet data should be parsed to seconds+nanos.
Dagger job is created Data source is selected as parquet Multiple date partitioned local folder path containing multiple parquet files is provided as input Dagger should process all the files in chronological order of dates and then exit gracefully.Only primitive types will be parsed into their equivalent Java types. Other complex types can be empty.Any int64 timestamp fields in the parquet data should be parsed to seconds+nanos.
Dagger job is created Data source is selected as parquet Multiple hour partitioned local folder path containing multiple parquet files is provided as input Dagger should process all the files in chronological order of hour and then exit gracefully.Only primitive types will be parsed into their equivalent Java types. Other complex types can be empty.Any int64 timestamp fields in the parquet data should be parsed to seconds+nanos.
Dagger job is created Data source is selected as parquet Multiple date/hour partitioned folder paths containing 0 files. Dagger should stop gracefully

doc: Add documentation+FAQs for Parquet DataSource

Acceptance Criteria
Add doc for new configurations exposed for Parquet File Source and their meaning. Also, mention their default values. Assumptions made such as time ranges should be in UTC with both inclusive, etc should be mentioned here.
Add doc explaining the end to end flow on how the source will work as well as how to configure the source for a sample use case.
Add some FAQs (not comprehensive, cover the most frequent ones).

Add support for JSON data

Currently, Dagger only supports SerDe for Protobuf encoded messages.

  • JSON SerDe is already supported in Flink. Add support for enabling it.
  • Apart from SerDe, postprocessors, preprocessors, some functions like ElementAt, HashTransformer also rely on Dynamic Message APIs. Need to ensure they work for JSON data as well.

feat: enhance dagger core to be able to add python UDF

Acceptance criteria:

  • Configuration to enable udfs.

  • code changes in dagger to register udfs from GCS.

  • Code abstraction in dagger.

Out of scope

  • CI to publish compressed python files into github packages

  • Actual python udfs

  • Documentation

Add support for Flink 1.14

Upgrade to the most recent stable flink version (1.14).
Fix deprecated APIs and do the required refactoring.

Make HashTransformer use common SerDe

Currently, the HashTransformer uses its own implementation for protobuf SerDe.

  • Move SerDe logic from dagger-core to dagger-common.
  • Use common SerDe for both core and HashTransformer.

feat: implement IndexOrderedSplitAssigner

As part of this issue, we want to add support for configuring a split assigner which can assign splits to source readers in an almost-deterministic order as decided by the user.

The order will be decided by the order of file paths specified as part of the StreamConfig SOURCE_PARQUET_FILE_PATHS.

feat: Create sink-connectors repo and add atleast one sink with one abstraction

Context:
As a first task, we want to create a repo which will hold the sink-connector library. This library will be used by both firehose and dagger. it might be private in the beginning until completed.

Acceptance criteria:

  • Define the abstraction for the interfaces.

  • Implement influx db sink as a common library, as it is part of both firehose and dagger and it does not require batching in dagger side in the current implementation.

  • the library should provide way to plug in instrumentation for both firehose and dagger.

Out of scope:

Implementation of a sink on dagger and/or firehose. We will do it in the next ticket.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.