Giter Site home page Giter Site logo

dbt-decodable's Introduction

dbt-decodable

dbt adapter for Decodable.

dbt enables data analysts and engineers to transform their data using the same practices that software engineers use to build applications.

Decodable is a fully managed stream processing service, based on [Apache Flink®] and using SQL as the primary means of defining data streaming pipelines.

Installation

dbt-decodable is available on PyPI. To install the latest version via pip (optionally using a virtual environment), run:

python3 -m venv dbt-venv         # create the virtual environment
source dbt-venv/bin/activate     # activate the virtual environment
pip install dbt-decodable        # install the adapter

Getting Started

Once you've installed dbt in a virtual environment, we recommend trying out the example project provided by decodable:

# clone the example project
git clone https://github.com/decodableco/dbt-decodable.git
cd dbt-decodable/example_project/example/

# Ensure you can connect to decodable via the decodable CLI:
# If you don't have installed the decodable CLI,
# install it following these instructions: https://docs.decodable.co/docs/setup#install-the-cli-command-line-interface
decodable connection list

# Ensure you have a  ~/.dbt/profiles.yml file:
cat ~/.dbt/profiles.yml
dbt-decodable: # this name must match the 'profile' from dbt_project.yml
  outputs:
    dev:
      account_name: <fill in your decodable account name>
      profile_name: default # fill in any profile defined in ~/.decodable/config
      type: decodable
      database: db
      schema: demo
      local_namespace: dbt_demo
  target: dev

# This will launch the example project
dbt run

Note that this dbt adapter ignores the active-profile setting in ~/.decodable/config. You must put the decodable profile you want to use in the ~/.dbt/profiles.yml file into the profile_name setting. The adapter does not support a custom decodable base-url (e.g. for local development or proxies).

Configuring your profile

Profiles in dbt describe a set of configurations specific to a connection with the underlying data warehouse. Each dbt project should have a corresponding profile (though profiles can be reused for different project). Within a profile, multiple targets can be described to further control dbt's behavior. For example, it's very common to have a dev target for development and a prod target for production related configurations.

Most of the profile configuration options available can be found inside the dbt documentation. Additionally, dbt-decodable defines a few adapter-specific ones that can be found below.

dbt-decodable:        # the name of the profile
  target: dev         # the default target to run commands with
  outputs:            # the list of all defined targets under this profile
    dev:              # the name of the target
      type: decodable
      database: None  # Ignored by this adapter, but required properties
      schema: None    # Ignored by this adapter, but required properties

      # decodable specific settings
      account_name: [your account]          # Decodable account name
      profile_name: [name of the profile]   # Decodable profile name
      materialize_tests: [true | false]     # whether to materialize tests as a pipeline/stream pair, default is `false`
      timeout: [ms]                         # maximum accumulative time a preview request should run for, default is `60000`
      preview_start: [earliest | latest]    # whether preview should be run with `earliest` or `latest` start position, default is `earliest`
      local_namespace: [namespace prefix]   # prefix added to all entities created on Decodable, default is `None`, meaning no prefix gets added.

dbt looks for the profiles.yml file in the ~/.dbt directory. This file contains all user profiles.

Supported Features

Materializations

Only table materialization is supported for dbt models at the moment. A dbt table model translates to a pipeline/stream pair on Decodable, both sharing the same name. Pipelines for models are automatically activated upon materialization.

To materialize your models simply run the dbt run command, which will perform the following steps for each model:

  1. Create a stream with the model's name and schema inferred by Decodable from the model's SQL.

  2. Create a pipeline that inserts the SQL's results into the newly created stream.

  3. Activate the pipeline.

By default, the adapter will not tear down and recreate the model on Decodable if no changes to the model have been detected. However, if changes to a decodable stream have been detected, it will be deleted and recreated. We recommend configuring a local_namespace for dbt-managed resources to prevent accidential deletion of streams. Invoking dbt with the --full-refresh flag set, or setting that configuration option for a specific model will cause the corresponding resources on Decodable to be destroyed and built from scratch. See the docs for more information on using this option.

Custom model configuration

A watermark option can be configured to specify the watermark to be set for the model's respective Decodable stream. See the http events example.

A primary_key option can be configured to specify the primary key if the target stream is a change stream. See the group by example.

More on specifying configuration options per model can be found here.

Seeds

dbt seed will perform the following steps for each specified seed:

  1. Create a REST connection and an associated stream with the same name (reflecting the seed's name).

  2. Activate the connection.

  3. Send the data stored in the seed's .csv file to the connection as events.

  4. Deactivate the connection.

After these steps are completed, you can access the seed's data on the newly created stream.

Sources

Sources in dbt correspond to Decodable's source connections. However, dbt source command is not supported at the moment.

Documentation

dbt docs is not supported at the moment. You can check your Decodable account for details about your models.

Testing

Based on the materialize_tests option set for the current target, dbt test will behave differently:

  • materialize_tests = false will cause dbt to run the specified tests as previews return the results after they finish. The exact time the preview runs for, as well as whether they run starting positions should be set to earliest or latest can be changed using the timeout and preview_start target configurations respectively.

  • materialize_tests = true will cause dbt to persist the specified tests as pipeline/stream pairs on Decodable. This configuration is designed to allow continous testing of your models. You can then run a preview on the created stream (for example using Decodable CLI) to monitor the results.

Snapshots

Neither the [dbt snapshot] command nor the notion of snapshots are supported at the moment.

Additional Operations

dbt-decodable provides a set of commands for managing the project's resources on Decodable. Those commands can be run using dbt run-operation {name} --args {args}.

Example invocation of the delete_streams operation detailed below:

$ dbt run-operation delete_streams --args '{streams: [stream1, stream2], skip_errors: True}'

stop_pipelines(pipelines)

pipelines : Optional list of names. Default value is None.

Deactivate pipelines for resources defined within the project. If the pipelines arg is provided, the command only considers the listed resources. Otherwise, it deactivates all pipelines associated with the project.


delete_pipelines(pipelines)

pipelines : Optional list of names. Default value is None.

Delete pipelines for resources defined within the project. If the pipelines arg is provided, the command only considers the listed resources. Otherwise, it deletes all pipelines associated with the project.


delete_streams(streams, skip_errors)

streams : Optional list of names. Default value is None.
skip_errors : Whether to treat errors as warnings. Default value is true.

Delete streams for resources defined within the project. Note that it does not delete pipelines associated with those streams, failing to remove a stream if one exists. For a complete removal of stream/pipeline pairs, see the cleanup operation.
If the streams arg is provided, the command only considers the listed resources. Otherwise, it attempts to delete all streams associated with the project.
If skip_errors is set to true, failure to delete a stream (e.g. due to an associated pipeline) will be reported as a warning. Otherwise, the operation stops upon the first error encountered.


cleanup(list, models, seeds, tests)

list : Optional list of names. Default value is None.
models : Whether to include models during cleanup. Default value is true.
seeds : Whether to include seeds during cleanup. Default value is true.
tests : Whether to include tests during cleanup. Default value is true.

Delete all Decodable entities resulting from the materialization of the project's resources, i.e. connections, streams and pipelines.
If the list arg is provided, the command only considers the listed resources. Otherwise, it deletes all entities associated with the project.
The models, seeds and tests arguments specify whether those resource types should be included in the cleanup. Note that cleanup does nothing for tests that have not been materialized.

Known limitations

The dbt decodable adapter does not allow managing decodable connectors via dbt. You can only create streams and pipelines with dbt.

Contributions

Contributions to this repository are more than welcome. Please create any pull requests against the [main] branch.

Each release is maintained in a releases/* branch, such as releases/v1.3.2, and there's a tag for it.

Build local version

pip install .

How to create a release

This is based on an example release called v1.3.3.

# We assume to be on 'main'.
# Fork into release branch
git checkout -b releases/v1.3.3
# Edit pyproject.toml and set: version = "1.3.3"
vi pyproject.toml
# create release commit
git commit -am "[#2] Set version to v1.3.3"
# Create a release with a tag from the GitHub UI pointing to the commit we just created.
# CI will do the rest.

License

This code base is available under the Apache License, version 2.

Apache Flink, Flink®, Apache®, the squirrel logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation.

dbt-decodable's People

Contributors

dependabot[bot] avatar nicoweidner avatar rmetzger avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

dbt-decodable's Issues

Can't generate change stream via dbt

When adding the following pipeline via dbt:

SELECT p.category, sum(o.price)
FROM gunnar_orders o
    LEFT JOIN gunnar_products p ON o.product_id = p.id
    LEFT JOIN gunnar_shipments s ON s.order_id = o.order_id
GROUP BY p.category

I'm getting this error:

(dbt-venv) ➜  example git:(main) ✗ dbt run
12:01:14  Running with dbt=1.3.3
12:01:14  Found 1 model, 0 tests, 0 snapshots, 0 analyses, 273 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
12:01:14
12:01:16  Concurrency: 1 threads (target='dev')
12:01:16
12:01:16  1 of 1 START sql table model gunnar_orders_aggregated .......................... [RUN]
12:01:19  1 of 1 ERROR creating sql table model gunnar_orders_aggregated ................. [ERROR in 2.71s]
12:01:19
12:01:19  Finished running 1 table model in 0 hours 0 minutes and 5.28 seconds (5.28s).
12:01:19
12:01:19  Completed with 1 error and 0 warnings:
12:01:19
12:01:19  Compilation Error in model gunnar_orders_aggregated (models/example/gunnar_orders_aggregated.sql)
12:01:19    Error checking changes to the 'gunnar_orders_aggregated' stream: Compilation Error
12:01:19      Type 'STRING PRIMARY KEY' not recognized
12:01:19
12:01:19    > in macro materialization_table_decodable (macros/materializations/table/table.sql)
12:01:19    > called by model gunnar_orders_aggregated (models/example/gunnar_orders_aggregated.sql)
12:01:19
12:01:19  Done. PASS=0 WARN=0 ERROR=1 SKIP=0 TOTAL=1

Target stream is deleted and recreated even if no schema changes are required

If

  • the target stream exists when executing dbt run, and
  • changes are detected in the sql script (*),

then both the target stream as well as the pipeline are deleted and recreated - even if the stream schema matches the sql. There should be no need to delete and recreate the stream if the schema matches.

(*) Changes can be as trivial as added whitespace. Maybe this could be an issue in itself, though I am not sure how hard it would be to implement a good solution.

Table sink doesn't support consuming update and delete changes

Query in dbt:

select o.order_id, o.price, p.name, p.category, s.*
from gunnar_orders o
  inner join gunnar_products p on o.product_id = p.product_id
  left join gunnar_shipments s on s.order_id = o.order_id

Failure:

15:58:29  Decodable: InvalidRequest: {'timestamp': '2023-03-24T15:58:29.641+00:00', 'status': 400, 'error': 'Bad Request', 'message': "Invalid pipeline. Reason: Table sink 'default_catalog.default_database.gunnar_orders_joined' doesn't support consuming update and delete changes which is produced by node Join(joinType=[LeftOuterJoin], where=[(order_id0 = order_id)], select=[order_id, price, name, category, shipment_id, order_id0, origin, destination, is_arrived], leftInputSpec=[NoUniqueKey], rightInputSpec=[HasUniqueKey])", 'path': '/v1alpha2/pipelines'}

Add seed to example project

There is currently no example for a seed. A basic .csv should be added to the existing example project or a new one.

Type 'TIMESTAMP(3) WITH LOCAL TIME ZONE' not recognized

This one works in the UI but not via dbt:

select
  after.product_id,
  before.name as old_name,
  after.name as new_name,
  to_timestamp_ltz(ts_ms, 3) as update_time
from table(to_append(`gunnar_products`))
where op = 'u'
and before.name <> after.name
17:02:53  Completed with 1 error and 0 warnings:
17:02:53
17:02:53  Compilation Error in model gunnar_product_name_changes (models/example/gunnar_product_name_changes.sql)
17:02:53    Error creating the gunnar_product_name_changes stream: Compilation Error
17:02:53      Type 'TIMESTAMP(3) WITH LOCAL TIME ZONE' not recognized
17:02:53
17:02:53    > in macro decodable__create_table_as (macros/materializations/table/create_table_as.sql)
17:02:53    > called by macro create_table_as (macros/materializations/models/table/create_table_as.sql)
17:02:53    > called by macro statement (macros/etc/statement.sql)
17:02:53    > called by macro materialization_table_decodable (macros/materializations/table/table.sql)
17:02:53    > called by model gunnar_product_name_changes (models/example/gunnar_product_name_changes.sql)

Add namespace to Getting Started example

In order to prevent accidental deletion of existing streams, and to make users aware of the namespace feature, we should add a namespace to the Getting Started guide.

"dbt seed --full-refresh" doesn't work if the relation exists already

I created test_seed.csv in the seeds folder with the following content:

field1,field2,field3
value11,value21,1
value12,value22,2

Then I ran dbt seed, which created and populated the stream dbt_demo__test_seed as expected (where dbt_demo is my local namespace).

However, when running dbt seed --full-refresh afterwards, I get this error:

15:41:12  Unhandled error while executing seed.example.test_seed
Decodable: ResourceAlreadyExists: {'timestamp': '2024-01-14T15:41:12.112+00:00', 'message': 'Unable to delete stream id [cad36ab9] as it is referenced by connection [ff748cbd].'}

Release version 1.3.5

  • Added namespace to Getting Started example
  • Migrated SQL preview and stream clear usage to the data plane. This should have no noticable impact on the user, but is required to ensure compatibility with Decodable platform once the legacy control plane endpoints are removed

Provide intermediate output for (non-materialized) tests

Non-materialized tests currently run a SQL preview and only output anything once the preview is complete - which, as of now, can take a while and appear unresponsive (75s in some basic tests I did). It would be good to periodically output something to inform the user of the current state ("Submitted preview query", "Waiting for results", ...)

Can't work with decodable-demo workspace

This fails:

dbt run
16:35:10  Running with dbt=1.3.3
16:35:10  Found 1 model, 0 tests, 0 snapshots, 0 analyses, 273 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
16:35:10
16:35:11  Encountered an error:
Runtime Error
  Status code: 403. Decodable connection failed. Try running 'decodable login' first

But I am definitely logged in, e.g. decodable connection list shows me all connections.

Materialized tests do not work

Trying to run a materialized test leads to the following error:

12:09:48  Completed with 1 error and 0 warnings:
12:09:48  
12:09:48  Compilation Error in macro decodable__create_table_as (macros/materializations/table/create_table_as.sql)
12:09:48    macro 'dbt_macro__decodable__create_table_as' takes not more than 3 argument(s)
12:09:48    
12:09:48    > in macro create_table_as (macros/materializations/models/table/create_table_as.sql)
12:09:48    > called by macro statement (macros/etc/statement.sql)
12:09:48    > called by macro materialize_test_as_table (macros/materializations/test/test_as_table.sql)
12:09:48    > called by macro materialization_test_decodable (macros/materializations/test/test.sql)
12:09:48    > called by macro decodable__create_table_as (macros/materializations/table/create_table_as.sql)

Looking at the code, I see that the create_table_as macro takes only 3 arguments, while the test_as_table macro tries to call it with 4

"dbt seed" sometimes fails to deactivate the connection

I have the following basic test_seed.csv file in the seeds folder:

field1,field2,field3
value11,value21,1
value12,value22,2

I ran dbt seed twice (testing for issue 33). The second time, I received this error:

15:50:53  Unhandled error while executing seed.example.test_seed
Decodable: ResourceAlreadyExists: {'timestamp': '2024-01-14T15:50:53.806+00:00', 'message': 'Could not change the job state from [STARTING] to [STOPPING], probably because an existing operation is in progress. Please wait for the operation to complete.'}

At the same time, the old records were gone and the stream was populated with two new records. So it looks like:

  • The stream was cleared
  • The connection was restarted to repopulate the stream
  • The stream was repopulated

(Unexpected behavior starts here)

  • The attempt to deactivate the connection failed as it was still in STARTING state

While it is certainly unexpected that the connection already produced records while still being in STARTING state, the dbt client should wait until it observes RUNNING state before deactivation.

Side note: Looks like all 409 errors are classified as ResourceAlreadyExists. That's not precise, as this example shows. Created #36 to cover that.

Migrate SQL preview to new BYOC architecture

The introduction of BYOC to Decodable brought along a change in architecture - a much clearer split between the control plane and data plane.
Previously, previews were started and fetched directly from the control plane API. After this change, authorization tokens have to be obtained from the control plane, while the preview is started and fetched from the data plane API.

See https://docs.decodable.co/reference/createpreviewtokens, https://docs.decodable.co/reference/createsqlpreview, https://docs.decodable.co/reference/getsqlpreview.

Can't work with profile not named "default"?

Getting this error:

dbt run
16:20:57  Running with dbt=1.3.3
16:20:57  Found 1 model, 0 tests, 0 snapshots, 0 analyses, 273 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
16:20:57
16:20:58  Encountered an error:
Undefined 'default in decodable profile file ~/.decodable/auth
16:20:58  Traceback (most recent call last):
  File "demo/dbt-venv/lib/python3.9/site-packages/dbt/main.py", line 135, in main
    results, succeeded = handle_and_check(args)
  File "demo/dbt-venv/lib/python3.9/site-packages/dbt/main.py", line 198, in handle_and_check
    task, res = run_from_args(parsed)
  File "demo/dbt-venv/lib/python3.9/site-packages/dbt/main.py", line 245, in run_from_args
    results = task.run()
  File "demo/dbt-venv/lib/python3.9/site-packages/dbt/task/runnable.py", line 472, in run
    result = self.execute_with_hooks(selected_uids)
  File "demo/dbt-venv/lib/python3.9/site-packages/dbt/task/runnable.py", line 434, in execute_with_hooks
    self.before_run(adapter, selected_uids)
  File "demo/dbt-venv/lib/python3.9/site-packages/dbt/task/run.py", line 426, in before_run
    self.populate_adapter_cache(adapter, required_schemas)
  File "demo/dbt-venv/lib/python3.9/site-packages/dbt/task/runnable.py", line 409, in populate_adapter_cache
    adapter.set_relations_cache(self.manifest)
  File "demo/dbt-venv/lib/python3.9/site-packages/dbt/adapters/base/impl.py", line 437, in set_relations_cache
    self._relations_cache_for_schemas(manifest, required_schemas)
  File "demo/dbt-venv/lib/python3.9/site-packages/dbt/adapters/base/impl.py", line 414, in _relations_cache_for_schemas
    for relation in future.result():
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 438, in result
    return self.__get_result()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
    raise self._exception
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/concurrent/futures/thread.py", line 52, in run
    result = self.fn(*self.args, **self.kwargs)
  File "demo/dbt-venv/lib/python3.9/site-packages/dbt/utils.py", line 480, in connected
    return func(*args, **kwargs)
  File "demo/dbt-venv/lib/python3.9/site-packages/dbt/adapters/decodable/impl.py", line 348, in list_relations_without_caching
    stream_list: List[Dict[str, Any]] = self._client().list_streams().items
  File "demo/dbt-venv/lib/python3.9/site-packages/dbt/adapters/decodable/impl.py", line 641, in _client
    self.get_thread_connection().handle
  File "demo/dbt-venv/lib/python3.9/site-packages/dbt/contracts/connection.py", line 94, in handle
    self._handle.resolve(self)
  File "demo/dbt-venv/lib/python3.9/site-packages/dbt/contracts/con
nection.py", line 116, in resolve
    return self.opener(connection)
  File "demo/dbt-venv/lib/python3.9/site-packages/dbt/adapters/decodable/connections.py", line 114, in open
    client = DecodableClientFactory.create_client(
  File "demo/dbt-venv/lib/python3.9/site-packages/decodable/client/client_factory.py", line 34, in create_client
    raise Exception(
Exception: Undefined 'default in decodable profile file ~/.decodable/auth

My ~/.decodable/config looks like this (note the active profile is "decodable-demo"):

version: 1.0.0
active-profile: decodable-demo
profiles:
    decodable-demo:
        account: decodable-demo
    default:
        account: decodable-test

Improve API error classification

As observed in #35, the current error classification could use improvement. All 409 errors are described as ResourceAlreadyExists, which is not precise (see linked issue for an example).

I wonder why other error codes don't appear here either (e.g. 403, 415, ...)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.