Giter Site home page Giter Site logo

linkedin / hoptimator Goto Github PK

View Code? Open in Web Editor NEW
76.0 10.0 12.0 340 KB

Multi-hop declarative data pipelines

License: BSD 2-Clause "Simplified" License

Dockerfile 0.18% Makefile 0.69% Shell 0.27% Java 98.85%
brooklin cdc data-pipelines flink kafka kafka-connect

hoptimator's Introduction

Hoptimator

Multi-hop declarative data pipelines

What is Hoptimator?

Hoptimator is an SQL-based control plane for complex data pipelines.

Hoptimator turns high-level SQL subscriptions into multi-hop data pipelines. Pipelines may involve an auto-generated Flink job (or similar) and any arbitrary resources required for the job to run.

How does it work?

Hoptimator has a pluggable adapter framework, which lets you wire-up arbtitary data sources. Adapters loosely correspond to connectors in the underlying compute engine (e.g. Flink Connectors), but they may include custom control plane logic. For example, an adapter may create a cache or a CDC stream as part of a pipeline. This enables a single pipeline to span multiple "hops" across different systems (as opposed to, say, a single Flink job).

Hoptimator's pipelines tend to have the following general shape:

                                _________
topic1 ----------------------> |         |
table2 --> CDC ---> topic2 --> | SQL job | --> topic4
table3 --> rETL --> topic3 --> |_________|

The three data sources on the left correspond to three different adapters:

  1. topic1 can be read directly from a Flink job, so the first adapter simply configures a Flink connector.
  2. table2 is inefficient for bulk access, so the second adapter creates a CDC stream (topic2) and configures a Flink connector to read from that.
  3. table3 is in cold storage, so the third adapter creates a reverse-ETL job to re-ingest the data into Kafka.

In order to deploy such a pipeline, you only need to write one SQL query, called a subscription. Pipelines are constructed automatically based on subscriptions.

Quick Start

Prerequistes

  1. docker is installed and docker daemon is running
  2. kubectl is installed and cluster is running
    1. minikube can be used for a local cluster
  3. helm for Kubernetes is installed

Run

  $ make quickstart
  ... wait a while ...
  $ ./bin/hoptimator
  > !intro
  > !q

Subscriptions

Subscriptions are SQL views that are automatically materialized by a pipeline.

  $ kubectl apply -f deploy/samples/subscriptions.yaml

In response, the operator will deploy a Flink job and other resources:

  $ kubectl get subscriptions
  $ kubectl get flinkdeployments
  $ kubectl get kafkatopics

You can verify the job is running by inspecting the output:

  $ ./bin/hoptimator
  > !tables
  > SELECT * FROM RAWKAFKA."products" LIMIT 5;
  > !q

The Operator

Hoptimator-operator is a Kubernetes operator that orchestrates multi-hop data pipelines based on Subscriptions (a custom resource). When a Subscription is deployed, the operator:

  1. creates a plan based on the Subscription SQL. The plan includes a set of resources that make up a pipeline.
  2. deploys each resource in the pipeline. This may involve creating Kafka topics, Flink jobs, etc.
  3. reports Subscription status, which depends on the status of each resource in the pipeline.

The operator is extensible via adapters. Among other responsibilities, adapters can implement custom control plane logic (see ControllerProvider), or they can depend on external operators. For example, the Kafka adapter actively manages Kafka topics using a custom controller. The Flink adapter defers to flink-kubernetes-operator to manage Flink jobs.

The CLI

Hoptimator includes a SQL CLI based on sqlline. This is primarily for testing and debugging purposes, but it can also be useful for runnig ad-hoc queries. The CLI leverages the same adapters as the operator, but it doesn't deploy anything. Instead, queries run as local, in-process Flink jobs.

hoptimator's People

Contributors

hoganedwardchu avatar huiwang1020 avatar jogrogan avatar katiewaddell avatar ryannedolan avatar stevenewald avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

hoptimator's Issues

CLI logs are too verbose

The CLI and operator currently use slf4j-simple with default configuration. This is too verbose for the CLI, as Flink generates a ton of logs.

!test command

We need a way to test SQL statements in hoptimator-cli (and esp in integration tests) beyond failing if they crash. Borrowing from dbt, we can consider a test as passing if it returns zero results (but doesn't crash). This would allow us to write SQL that e.g. verifies that two tables contain the same data, or e.g. verifies that a column doesn't have malformed values.

Tests would look something like:

-- test that a table has no null values
0: hoptimator> !test SELECT X FROM DB.FOO WHERE X=NULL

This is tricky because most tables we encounter are unbounded streams, so we don't know when we've exhausted an entire table. If the example above involved a Kafka topic, the query would hang or timeout. In that case, we may "PASS" a test, even if subsequent malformed data arrives a second later. Moreover, from an implementation perspective, it may be difficult to distinguish between "this job crashed from some error" (FAIL) vs "this job timed-out without finding malformed values" (PASS).

An additional complexity here is that hoptimator's planner does not support LIMIT clauses. When you run something like SELECT * FROM DB.FOO LIMIT 5 in the CLI, the LIMIT part is handled by Calcite's Enumerable calling convention -- not by Hoptimator's "pipeline" calling convention. This is because it generally makes no sense to have a pipeline with limited data. Thus, we may wish to !test a query that includes a LIMIT, but there will be no way to plan such a pipeline. One possible way forward here is to extend FlinkIterable with a limit, and leverage DataStream.executeAndCollect(limit). Interestingly, we only need one result to FAIL a test (since that would be a non-zero number of rows) so !test could conceivably just always use limit=1.

Expose "hints" to the planner

#37 adds "subscription hints", which are passed to the control plane via templates. This is useful for things like kafka.numPartitions which are specific to the control plane.

For more advanced use-cases, we need the planner (i.e. PipelineRel etc) to have access to "hints" as well. This would enable users to influence the output (Flink) SQL generated for a pipeline. For example, hints could be used to dynamically enable/disable specific planner rules.

Calcite has a concept of "contexts" which might be useful here. See: https://calcite.apache.org/javadocAggregate/org/apache/calcite/plan/Context.html

Command-line arguments for hoptimator-operator

Currently hoptimator-operator accepts one argument, the path to a Calcite "model file". Remaining properties are hard-coded, including the K8s namespace ("default"). We need a way to provide a few additional command-line arguments, e.g. --namespace my-namespace, along with sensible defaults.

Additionally, individual controllers/reconcilers may need to be configured with their own properties. Controllers are pluggable, so we need a way to pass arbitrary key-value properties via command-line arguments, e.g. --property key=value. Controllers should be able to see these properties via the Operator object, e.g. Operator.properties().get("key").

Sub operator ignores changes to hints

Currently, the subscription operator detects changes to the Subscription object by comparing .spec.sql and .status.sql. If these are different, the operator plans a new pipeline and updates the status s.t. .status.sql == .spec.sql.

This means that the operator will ignore changes to anything besides the SQL. This is problematic if we want to e.g. add partitions to a Kafka topic sink by updating .spec.hints.kafka.numPartitions.

We should also detect changes to .spec.hints and plan a new pipeline. This would let us change how a pipeline is implemented, even if the SQL doesn't change.

Support multiple input dialects

The CLI, based on sqlline, currently parses ANSI SQL. This turns out to be an unpopular dialect. We should probably use the MySQL dialect by default, since that is what Flink uses. It's unclear whether this is at all possible with sqlline.

At the same time, the planner will need to support the same dialects. Ideally, we'd switch from ANSI SQL to the MySQL dialect everywhere.

Support multiple planner output dialects

The planner is currently hard-coded to output MySQL dialect, which Flink natively understands. In order to target additional runtimes, we need a way to generate SQL in other dialects. N.B. the planner is largely dialect-agnostic already, until you ask for the plan as a String.

Expose whether a table is being used as a source or a sink

The Catalog API (which includes interfaces from Calcite, along with Database, HopTable, TableResolver, etc), was designed to represent data sources, and was only later leveraged for sinks as well. This happens to work because Flink Connectors tend to be symmetrical. For example, the Flink Kafka connector can be used to both read and write from Kafka using the same configuration, and the same table definition DDL. It's easy to write an adapter that uses such a Flink Connector, and the resulting tables can be used as either sources or sinks.

OTOH, not all Flink Connectors are symmetrical. Occasionally, the configuration (and thus, the DDL) for a Flink Connector is different when used as a source or sink. Often, a Connector supports one mode but not the other. More importantly, the Resources the planner generates may need to be different in either case. This means it would be valuable for an adapter to do one thing when generating resources for a source table, and another for a sink table.

In particular, we would like to add ACL support, which would involve generating Access Control Entries. Naturally, the ACE is different when a table is used as a source or a sink. Trivially, a sink table needs WRITE permission, whereas a source table only needs READ permission. If we view ACEs as a type of Resource, we essentially want to generate different Resources depending on how the table is being used.

Some ideas

  1. We could limit this complexity to Resources only. This appears to neglect the asymmetrical Flink Connector problem, but technically Flink jobs are Resources. If we can solve this for Resource generation, we 1) have an easy way to address the ACL issue, and 2) have a hard way to address asymmetrical connectors. It's unclear how hard, but it should be possible?
  2. We could add ResourceProvider.additionalSinkResources() with a default of empty list. This would keep the ResourceProvider functional interface simple, but would give a hook for implementers that need sink-specific resources.
  3. We could change HopTable to carry around sink-specific resources in addition to the existing Collection<Resource>. This could be empty unless the ResourceProvider has additionalSinkResources() overloaded (or some other mechanism provides these sink-specific resources). This would mean that any HopTable could be used as either source or sink, since the additional sink-specific resources would also be there if the planner needs them.
  4. The planner would need to "implement" the additional sink-specific resources in insertInto(sink) etc. Basically: sink.additionalSinkResources().forEach(x -> resource(x));

Support generating multiple YAML documents from one template

A template like SqlJob.yaml.template can include multiple documents, separated by ---. This is could be useful if one Resource needs to be implemented with multiple Kubernetes objects. For example, a SqlJob could be implemented as a FlinkDeployment and a ConfigMap.

However, the operator currently treats each Resource as a single document, and makes one call to the Kubernetes API for each. Instead, the operator should:

  1. Generate yaml by applying templates, as it does now.
  2. Split the output yaml into separate documents.
  3. Apply each document individually.

It is probably insufficient to just String.split on ---, as this sequence may appear in the template in an escaped string. Instead, the YAML parser should be used to parse the YAML into a list of documents.

hoptimator-cli hangs/times-out for unbounded queries

When you enter a query with ... LIMIT N, hoptimator-cli will wait for N results and then continue gracefully. But if the LIMIT is omitted, or if there are less than N rows available, the query will hang waiting for results. This would be fine, except Ctrl-C fails to cancel the command and the underlying Flink job. The process must be killed externally, or we must wait for a pre-determined timeout. For now, we wait for 20 seconds and then cancel the job, dropping any buffered results on the floor.

Ideally, the job should continue to run until:
a) N results are ready, and thus the iterator is closed, or
b) you hit ctrl-C.

Support no-op Resources

Currently, Resource objects returned by a ResourceProvider are expected to match up with a template file, e.g. SqlJob.yaml.template. As described in Issue #34, this will cause the operator to create exactly one K8s object. In #34 we aim to support generating multiple K8s objects from a single Resource, but we additionally should support generating zero K8s objects from a Resource. This will enable Adapters to provide no-op Resources, i.e. Resources that do not cause corresponding K8s objects to be created.

This will be useful in several scenarios:

  • An Adapter wants to provide Resources that some other Adapter will implement, without requiring a hard dependency on other Adapters.
  • An Adapter wants to provide an optional Resource and expects the end user to optionally provide a template.
  • An Adapter wants !mermaid etc to render metadata that doesn't correspond to actual K8s objects.
  • An end users wants to use an Adapter in an environment that doesn't support a specific K8s object.

N.B. it is insufficient to merely support empty templates, as this would create a scenario wherein Adapter updates may break existing deployments. If an Adapter provides a new Resource type but does not provide a template for it, the end user would need to add their own empty template to explicitly ignore the new Resource. Or, if the Adapter does include an empty template, the end user would need to carefully configure their build to prevent this empty template from overwriting their own custom templates, or templates from other Adapters.

For example, an Adapter update may provide a new Acl resource, expecting some other Adapter to implement it. In this case, the Adapter can either 1) bundle an empty template file, and possibly overwrite an end-user's Acl template, or 2) not include a template file, and expect the end-user to add one themselves.

Neither is a good user experience, so I think we need to support Resources with missing templates. Such Resources won't result in a K8s object being created, and won't result in any "template not found" error.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.