Giter Site home page Giter Site logo

dbt-athena / dbt-athena Goto Github PK

View Code? Open in Web Editor NEW
184.0 9.0 80.0 1.04 MB

The athena adapter plugin for dbt (https://getdbt.com)

Home Page: https://dbt-athena.github.io

License: Apache License 2.0

Python 99.60% Makefile 0.40%
athena dbt dbt-athena dbt-athena-community glue-catalog s3 iceberg

dbt-athena's Introduction

Features

  • Supports dbt version 1.7.*
  • Support for Python
  • Supports seeds
  • Correctly detects views and their columns
  • Supports table materialization
    • Iceberg tables are supported only with Athena Engine v3 and a unique table location (see table location section below)
    • Hive tables are supported by both Athena engines.
  • Supports incremental models
    • On Iceberg tables :
      • Supports the use of unique_key only with the merge strategy
      • Supports the append strategy
    • On Hive tables :
      • Supports two incremental update strategies: insert_overwrite and append
      • Does not support the use of unique_key
  • Supports snapshots
  • Supports Python models

Quick Start

Installation

  • pip install dbt-athena-community
  • Or pip install git+https://github.com/dbt-athena/dbt-athena.git

Prerequisites

To start, you will need an S3 bucket, for instance my-bucket and an Athena database:

CREATE DATABASE IF NOT EXISTS analytics_dev
COMMENT 'Analytics models generated by dbt (development)'
LOCATION 's3://my-bucket/'
WITH DBPROPERTIES ('creator'='Foo Bar', 'email'='[email protected]');

Notes:

  • Take note of your AWS region code (e.g. us-west-2 or eu-west-2, etc.).
  • You can also use AWS Glue to create and manage Athena databases.

Credentials

Credentials can be passed directly to the adapter, or they can be determined automatically based on aws cli/boto3 conventions. You can either:

  • configure aws_access_key_id and aws_secret_access_key
  • configure aws_profile_name to match a profile defined in your AWS credentials file Checkout dbt profile configuration below for details.

Configuring your profile

A dbt profile can be configured to run against AWS Athena using the following configuration:

Option Description Required? Example
s3_staging_dir S3 location to store Athena query results and metadata Required s3://bucket/dbt/
s3_data_dir Prefix for storing tables, if different from the connection's s3_staging_dir Optional s3://bucket2/dbt/
s3_data_naming How to generate table paths in s3_data_dir Optional schema_table_unique
s3_tmp_table_dir Prefix for storing temporary tables, if different from the connection's s3_data_dir Optional s3://bucket3/dbt/
region_name AWS region of your Athena instance Required eu-west-1
schema Specify the schema (Athena database) to build models into (lowercase only) Required dbt
database Specify the database (Data catalog) to build models into (lowercase only) Required awsdatacatalog
poll_interval Interval in seconds to use for polling the status of query results in Athena Optional 5
debug_query_state Flag if debug message with Athena query state is needed Optional false
aws_access_key_id Access key ID of the user performing requests. Optional AKIAIOSFODNN7EXAMPLE
aws_secret_access_key Secret access key of the user performing requests Optional wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
aws_profile_name Profile to use from your AWS shared credentials file. Optional my-profile
work_group Identifier of Athena workgroup Optional my-custom-workgroup
num_retries Number of times to retry a failing query Optional 3
spark_work_group Identifier of Athena Spark workgroup Optional my-spark-workgroup
num_boto3_retries Number of times to retry boto3 requests (e.g. deleting S3 files for materialized tables) Optional 5
seed_s3_upload_args Dictionary containing boto3 ExtraArgs when uploading to S3 Optional {"ACL": "bucket-owner-full-control"}
lf_tags_database Default LF tags for new database if it's created by dbt Optional tag_key: tag_value

Example profiles.yml entry:

athena:
  target: dev
  outputs:
    dev:
      type: athena
      s3_staging_dir: s3://athena-query-results/dbt/
      s3_data_dir: s3://your_s3_bucket/dbt/
      s3_data_naming: schema_table
      s3_tmp_table_dir: s3://your_s3_bucket/temp/
      region_name: eu-west-1
      schema: dbt
      database: awsdatacatalog
      threads: 4
      aws_profile_name: my-profile
      work_group: my-workgroup
      spark_work_group: my-spark-workgroup
      seed_s3_upload_args:
        ACL: bucket-owner-full-control

Additional information

  • threads is supported
  • database and catalog can be used interchangeably

Models

Table Configuration

  • external_location (default=none)
    • If set, the full S3 path in which the table will be saved.
    • It works only with incremental models.
    • Does not work with Hive table with ha set to true.
  • partitioned_by (default=none)
    • An array list of columns by which the table will be partitioned
    • Limited to creation of 100 partitions (currently)
  • bucketed_by (default=none)
    • An array list of columns to bucket data, ignored if using Iceberg
  • bucket_count (default=none)
    • The number of buckets for bucketing your data, ignored if using Iceberg
  • table_type (default='hive')
    • The type of table
    • Supports hive or iceberg
  • ha (default=false)
    • If the table should be built using the high-availability method. This option is only available for Hive tables since it is by default for Iceberg tables (see the section below)
  • format (default='parquet')
    • The data format for the table
    • Supports ORC, PARQUET, AVRO, JSON, TEXTFILE
  • write_compression (default=none)
    • The compression type to use for any storage format that allows compression to be specified. To see which options are available, check out CREATE TABLE AS
  • field_delimiter (default=none)
    • Custom field delimiter, for when format is set to TEXTFILE
  • table_properties: table properties to add to the table, valid for Iceberg only
  • native_drop: Relation drop operations will be performed with SQL, not direct Glue API calls. No S3 calls will be made to manage data in S3. Data in S3 will only be cleared up for Iceberg tables see AWS docs. Note that Iceberg DROP TABLE operations may timeout if they take longer than 60 seconds.
  • seed_by_insert (default=false)
    • default behaviour uploads seed data to S3. This flag will create seeds using an SQL insert statement
    • large seed files cannot use seed_by_insert, as the SQL insert statement would exceed the Athena limit of 262144 bytes
  • force_batch (default=false)
    • Skip creating the table as ctas and run the operation directly in batch insert mode.
    • This is particularly useful when the standard table creation process fails due to partition limitations, allowing you to work with temporary tables and persist the dataset more efficiently.
  • lf_tags_config (default=none)
    • AWS lakeformation tags to associate with the table and columns
    • enabled (default=False) whether LF tags management is enabled for a model
    • tags dictionary with tags and their values to assign for the model
    • tags_columns dictionary with a tag key, value and list of columns they must be assigned to
    • lf_inherited_tags (default=none)
      • List of Lake Formation tag keys that are intended to be inherited from the database level and thus shouldn't be removed during association of those defined in lf_tags_config
        • i.e., the default behavior of lf_tags_config is to be exhaustive and first remove any pre-existing tags from tables and columns before associating the ones currently defined for a given model
        • This breaks tag inheritance as inherited tags appear on tables and columns like those associated directly
{{
  config(
    materialized='incremental',
    incremental_strategy='append',
    on_schema_change='append_new_columns',
    table_type='iceberg',
    schema='test_schema',
    lf_tags_config={
          'enabled': true,
          'tags': {
            'tag1': 'value1',
            'tag2': 'value2'
          },
          'tags_columns': {
            'tag1': {
              'value1': ['column1', 'column2'],
              'value2': ['column3', 'column4']
            }
          },
          'inherited_tags': ['tag1', 'tag2']
    }
  )
}}
  • format for dbt_project.yml:
  +lf_tags_config:
    enabled: true
    tags:
      tag1: value1
      tag2: value2
    tags_columns:
      tag1:
        value1: [ column1, column2 ]
    inherited_tags: [ tag1, tag2 ]
  • lf_grants (default=none)

    • lakeformation grants config for data_cell filters
    • format:
    lf_grants={
            'data_cell_filters': {
                'enabled': True | False,
                'filters': {
                    'filter_name': {
                        'row_filter': '<filter_condition>',
                        'principals': ['principal_arn1', 'principal_arn2']
                    }
                }
            }
        }

Notes:

  • lf_tags and lf_tags_columns configs support only attaching lf tags to corresponding resources. We recommend managing LF Tags permissions somewhere outside dbt. For example, you may use terraform or aws cdk for such purpose.
  • data_cell_filters management can't be automated outside dbt because the filter can't be attached to the table which doesn't exist. Once you enable this config, dbt will set all filters and their permissions during every dbt run. Such approach keeps the actual state of row level security configuration actual after every dbt run and apply changes if they occur: drop, create, update filters and their permissions.
  • Any tags listed in lf_inherited_tags should be strictly inherited from the database level and never overridden at the table and column level
    • Currently dbt-athena does not differentiate between an inherited tag association and an override of same it made previously
    • e.g. If an inherited tag is overridden by an lf_tags_config value in one DBT run, and that override is removed prior to a subsequent run, the prior override will linger and no longer be encoded anywhere (in e.g. Terraform where the inherited value is configured nor in the DBT project where the override previously existed but now is gone)

Table location

The location in which a table is saved is determined by:

  1. If external_location is defined, that value is used.
  2. If s3_data_dir is defined, the path is determined by that and s3_data_naming
  3. If s3_data_dir is not defined, data is stored under s3_staging_dir/tables/

Here all the options available for s3_data_naming:

  • unique: {s3_data_dir}/{uuid4()}/
  • table: {s3_data_dir}/{table}/
  • table_unique: {s3_data_dir}/{table}/{uuid4()}/
  • schema_table: {s3_data_dir}/{schema}/{table}/
  • s3_data_naming=schema_table_unique: {s3_data_dir}/{schema}/{table}/{uuid4()}/

It's possible to set the s3_data_naming globally in the target profile, or overwrite the value in the table config, or setting up the value for groups of model in dbt_project.yml.

Note: when using a workgroup with a default output location configured, s3_data_naming and any configured buckets are ignored and the location configured in the workgroup is used.

Incremental models

Support for incremental models.

These strategies are supported:

  • insert_overwrite (default): The insert overwrite strategy deletes the overlapping partitions from the destination table, and then inserts the new records from the source. This strategy depends on the partitioned_by keyword! If no partitions are defined, dbt will fall back to the append strategy.
  • append: Insert new records without updating, deleting or overwriting any existing data. There might be duplicate data (e.g. great for log or historical data).
  • merge: Conditionally updates, deletes, or inserts rows into an Iceberg table. Used in combination with unique_key. Only available when using Iceberg.

On schema change

on_schema_change is an option to reflect changes of schema in incremental models. The following options are supported:

  • ignore (default)
  • fail
  • append_new_columns
  • sync_all_columns

For details, please refer to dbt docs.

Iceberg

The adapter supports table materialization for Iceberg.

To get started just add this as your model:

{{ config(
    materialized='table',
    table_type='iceberg',
    format='parquet',
    partitioned_by=['bucket(user_id, 5)'],
    table_properties={
     'optimize_rewrite_delete_file_threshold': '2'
     }
) }}

select 'A'          as user_id,
       'pi'         as name,
       'active'     as status,
       17.89        as cost,
       1            as quantity,
       100000000    as quantity_big,
       current_date as my_date

Iceberg supports bucketing as hidden partitions, therefore use the partitioned_by config to add specific bucketing conditions.

Iceberg supports several table formats for data : PARQUET, AVRO and ORC.

It is possible to use Iceberg in an incremental fashion, specifically two strategies are supported:

  • append: New records are appended to the table, this can lead to duplicates.
  • merge: Performs an upsert (and optional delete), where new records are added and existing records are updated. Only available with Athena engine version 3.
    • unique_key (required): columns that define a unique record in the source and target tables.
    • incremental_predicates (optional): SQL conditions that enable custom join clauses in the merge statement. This can be useful for improving performance via predicate pushdown on the target table.
    • delete_condition (optional): SQL condition used to identify records that should be deleted.
    • update_condition (optional): SQL condition used to identify records that should be updated.
    • insert_condition (optional): SQL condition used to identify records that should be inserted.
      • incremental_predicates, delete_condition, update_condition and insert_condition can include any column of the incremental table (src) or the final table (target). Column names must be prefixed by either src or target to prevent a Column is ambiguous error.

delete_condition example:

{{ config(
    materialized='incremental',
    table_type='iceberg',
    incremental_strategy='merge',
    unique_key='user_id',
    incremental_predicates=["src.quantity > 1", "target.my_date >= now() - interval '4' year"],
    delete_condition="src.status != 'active' and target.my_date < now() - interval '2' year",
    format='parquet'
) }}

select 'A' as user_id,
       'pi' as name,
       'active' as status,
       17.89 as cost,
       1 as quantity,
       100000000 as quantity_big,
       current_date as my_date

update_condition example:

{{ config(
        materialized='incremental',
        incremental_strategy='merge',
        unique_key=['id'],
        update_condition='target.id > 1',
        schema='sandbox'
    )
}}

{% if is_incremental() %}

select * from (
    values
    (1, 'v1-updated')
    , (2, 'v2-updated')
) as t (id, value)

{% else %}

select * from (
    values
    (-1, 'v-1')
    , (0, 'v0')
    , (1, 'v1')
    , (2, 'v2')
) as t (id, value)

{% endif %}

insert_condition example:

{{ config(
        materialized='incremental',
        incremental_strategy='merge',
        unique_key=['id'],
        insert_condition='target.status != 0',
        schema='sandbox'
    )
}}

select * from (
    values
    (1, 0)
    , (2, 1)
) as t (id, status)

Highly available table (HA)

The current implementation of the table materialization can lead to downtime, as target table is dropped and re-created. To have the less destructive behavior it's possible to use the ha config on your table materialized models. It leverages the table versions feature of glue catalog, creating a tmp table and swapping the target table to the location of the tmp table. This materialization is only available for table_type=hive and requires using unique locations. For iceberg, high availability is by default.

{{ config(
    materialized='table',
    ha=true,
    format='parquet',
    table_type='hive',
    partitioned_by=['status'],
    s3_data_naming='table_unique'
) }}

select 'a'      as user_id,
       'pi'     as user_name,
       'active' as status
union all
select 'b'        as user_id,
       'sh'       as user_name,
       'disabled' as status

By default, the materialization keeps the last 4 table versions, you can change it by setting versions_to_keep.

HA Known issues

  • When swapping from a table with partitions to a table without (and the other way around), there could be a little downtime. In case high performances are needed consider bucketing instead of partitions
  • By default, Glue "duplicates" the versions internally, so the last two versions of a table point to the same location
  • It's recommended to have versions_to_keep >= 4, as this will avoid having the older location removed

Snapshots

The adapter supports snapshot materialization. It supports both timestamp and check strategy. To create a snapshot create a snapshot file in the snapshots directory. If the directory does not exist create one.

Timestamp strategy

To use the timestamp strategy refer to the dbt docs

Check strategy

To use the check strategy refer to the dbt docs

Hard-deletes

The materialization also supports invalidating hard deletes. Check the docs to understand usage.

AWS Lakeformation integration

The adapter implements AWS Lakeformation tags management in the following way:

  • you can enable or disable lf-tags management via config (disabled by default)
  • once you enable the feature, lf-tags will be updated on every dbt run
  • first, all lf-tags for columns are removed to avoid inheritance issues
  • then all redundant lf-tags are removed from table and actual tags from config are applied
  • finally, lf-tags for columns are applied

It's important to understand the following points:

  • dbt does not manage lf-tags for database
  • dbt does not manage lakeformation permissions

That's why you should handle this by yourself manually or using some automation tools like terraform, AWS CDK etc.
You may find the following links useful to manage that:

Python Models

The adapter supports python models using spark.

Setup

  • A spark enabled work group created in athena
  • Spark execution role granted access to Athena, Glue and S3
  • The spark work group is added to the ~/.dbt/profiles.yml file and the profile is referenced in dbt_project.yml that will be created. It is recommended to keep this same as threads.

Spark specific table configuration

  • timeout (default=43200)
    • Time out in seconds for each python model execution. Defaults to 12 hours/43200 seconds.
  • spark_encryption (default=false)
    • If this flag is set to true, encrypts data in transit between Spark nodes and also encrypts data at rest stored locally by Spark.
  • spark_cross_account_catalog (default=false)
    • In spark, you can query the external account catalog and for that the consumer account has to be configured to access the producer catalog.
    • If this flag is set to true, "/" can be used as the glue catalog separator. Ex: 999999999999/mydatabase.cloudfront_logs (where 999999999999 is the external catalog id)
  • spark_requester_pays (default=false)
    • When an Amazon S3 bucket is configured as requester pays, the account of the user running the query is charged for data access and data transfer fees associated with the query.
    • If this flag is set to true, requester pays S3 buckets are enabled in Athena for Spark.

Spark notes

  • A session is created for each unique engine configuration defined in the models that are part of the invocation.
  • A session's idle timeout is set to 10 minutes. Within the timeout period, if there is a new calculation (spark python model) ready for execution and the engine configuration matches, the process will reuse the same session.
  • Number of python models running at a time depends on the threads. Number of sessions created for the entire run depends on number of unique engine configurations and availability of session to maintain threads concurrency.
  • For iceberg table, it is recommended to use table_properties configuration to set the format_version to 2. This is to maintain compatability between iceberg tables created by Trino with those created by Spark.

Example models

Simple pandas model

import pandas as pd


def model(dbt, session):
    dbt.config(materialized="table")

    model_df = pd.DataFrame({"A": [1, 2, 3, 4]})

    return model_df

Simple spark

def model(dbt, spark_session):
    dbt.config(materialized="table")

    data = [(1,), (2,), (3,), (4,)]

    df = spark_session.createDataFrame(data, ["A"])

    return df

Spark incremental

def model(dbt, spark_session):
    dbt.config(materialized="incremental")
    df = dbt.ref("model")

    if dbt.is_incremental:
        max_from_this = (
            f"select max(run_date) from {dbt.this.schema}.{dbt.this.identifier}"
        )
        df = df.filter(df.run_date >= spark_session.sql(max_from_this).collect()[0][0])

    return df

Config spark model

def model(dbt, spark_session):
    dbt.config(
        materialized="table",
        engine_config={
            "CoordinatorDpuSize": 1,
            "MaxConcurrentDpus": 3,
            "DefaultExecutorDpuSize": 1
        },
        spark_encryption=True,
        spark_cross_account_catalog=True,
        spark_requester_pays=True
        polling_interval=15,
        timeout=120,
    )

    data = [(1,), (2,), (3,), (4,)]

    df = spark_session.createDataFrame(data, ["A"])

    return df

Create pySpark udf using imported external python files

def model(dbt, spark_session):
    dbt.config(
        materialized="incremental",
        incremental_strategy="merge",
        unique_key="num",
    )
    sc = spark_session.sparkContext
    sc.addPyFile("s3://athena-dbt/test/file1.py")
    sc.addPyFile("s3://athena-dbt/test/file2.py")

    def func(iterator):
        from file2 import transform

        return [transform(i) for i in iterator]

    from pyspark.sql.functions import udf
    from pyspark.sql.functions import col

    udf_with_import = udf(func)

    data = [(1, "a"), (2, "b"), (3, "c")]
    cols = ["num", "alpha"]
    df = spark_session.createDataFrame(data, cols)

    return df.withColumn("udf_test_col", udf_with_import(col("alpha")))

Known issues in python models

  • Incremental models do not fully utilize spark capabilities. They depend partially on existing sql based logic which runs on trino.
  • Snapshots materializations are not supported.
  • Spark can only reference tables within the same catalog.

Working example

seed file - employent_indicators_november_2022_csv_tables.csv

Series_reference,Period,Data_value,Suppressed
MEIM.S1WA,1999.04,80267,
MEIM.S1WA,1999.05,70803,
MEIM.S1WA,1999.06,65792,
MEIM.S1WA,1999.07,66194,
MEIM.S1WA,1999.08,67259,
MEIM.S1WA,1999.09,69691,
MEIM.S1WA,1999.1,72475,
MEIM.S1WA,1999.11,79263,
MEIM.S1WA,1999.12,86540,
MEIM.S1WA,2000.01,82552,
MEIM.S1WA,2000.02,81709,
MEIM.S1WA,2000.03,84126,
MEIM.S1WA,2000.04,77089,
MEIM.S1WA,2000.05,73811,
MEIM.S1WA,2000.06,70070,
MEIM.S1WA,2000.07,69873,
MEIM.S1WA,2000.08,71468,
MEIM.S1WA,2000.09,72462,
MEIM.S1WA,2000.1,74897,

model.sql

{{ config(
    materialized='table'
) }}

select row_number() over() as id
       , *
       , cast(from_unixtime(to_unixtime(now())) as timestamp(6)) as refresh_timestamp
from {{ ref('employment_indicators_november_2022_csv_tables') }}

timestamp strategy - model_snapshot_1

{% snapshot model_snapshot_1 %}

{{
    config(
      strategy='timestamp',
      updated_at='refresh_timestamp',
      unique_key='id'
    )
}}

select *
from {{ ref('model') }} {% endsnapshot %}

invalidate hard deletes - model_snapshot_2

{% snapshot model_snapshot_2 %}

{{
    config
    (
        unique_key='id',
        strategy='timestamp',
        updated_at='refresh_timestamp',
        invalidate_hard_deletes=True,
    )
}}
select *
from {{ ref('model') }} {% endsnapshot %}

check strategy - model_snapshot_3

{% snapshot model_snapshot_3 %}

{{
    config
    (
        unique_key='id',
        strategy='check',
        check_cols=['series_reference','data_value']
    )
}}
select *
from {{ ref('model') }} {% endsnapshot %}

Snapshots Known issues

  • Incremental Iceberg models - Sync all columns on schema change can't remove columns used as partitioning. The only way, from a dbt perspective, is to do a full-refresh of the incremental model.

  • Tables, schemas and database should only be lowercase

  • In order to avoid potential conflicts, make sure dbt-athena-adapter is not installed in the target environment. See #103 for more details.

  • Snapshot does not support dropping columns from the source table. If you drop a column make sure to drop the column from the snapshot as well. Another workaround is to NULL the column in the snapshot definition to preserve history

Contracts

The adapter partly supports contract definition.

  • Concerning the data_type, it is supported but needs to be adjusted for complex types. They must be specified entirely (for instance array<int>) even though they won't be checked. Indeed, as dbt recommends, we only compare the broader type (array, map, int, varchar). The complete definition is used in order to check that the data types defined in athena are ok (pre-flight check).
  • the adapter does not support the constraints since no constraints don't exist in Athena.

Contributing

See CONTRIBUTING for more information on how to contribute to this project.

Contributors โœจ

Thanks goes to these wonderful people (emoji key):

Contributions of any kind welcome!

dbt-athena's People

Contributors

allcontributors[bot] avatar artem-garmash avatar avinash-1394 avatar brabster avatar brechtdevlieger avatar brunofaustino avatar chrischin478 avatar dandandan avatar daniel-cortez-stevenson avatar danya-fpnk avatar dependabot[bot] avatar gatsby-lee avatar henriblancke avatar hiro-o918 avatar ignacioreyna avatar jessedobbelaere avatar jrmyy avatar juliansteger-sc avatar jurgispods avatar maiarareinaldo avatar mattiamatrix avatar mrshu avatar nicor88 avatar octiva avatar roslovets avatar sanromeo avatar sashakcc avatar stumelius avatar svdimchenko avatar tomme avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dbt-athena's Issues

prehook sql throw exception

I wanted to exceute a pre_hook query like this
{{ config(materialized='view',pre_hook="drop table if exists my_first_dbt_model") }}
But I found that in athena , the sql become
-- /* {"app": "dbt", "dbt_version": "1.2.1", "profile_name": "local", "target_name": "dev", "node_id": "model.dbtest.my_first_dbt_model"} */ drop table if exists my_first_dbt_model
since all the sql stay in one line , the sql had been miss as a comment , It failed

Move all utlity macros in utils

Move all the macros that are utils in a specific path:
dbt/include/athena/macros/utils/
instead of being in
dbt/include/athena/macros/adapters/

Final location uses backslash in windows os

Hi,

After the model generated by DBT Athena,the table location was looks like this

s3://my-test-bucket/dbt/resident_communication\ticket_last_comment\6975c131-96c7-4b48-baf3-3b18acb6a431

and i think it should be like this

s3://my-test-bucket/dbt/resident_communication/ticket_last_comment/6975c131-96c7-4b48-baf3-3b18acb6a431

I think this is cuased by Windows OS path behavior, below is my profile.
s3_staging_dir: s3://my-test-bucket/dbt/
s3_data_dir: s3://my-test-bucket/dbt/
s3_data_naming: schema_table_unique

Is there a wrong setting with me? thank you very much.

Bump dbt-core to 1.4.0

Use latest version of dbt-core 1.4.0.
This change imply to bump the minor version of the adapter too.

Add Pull request template

What

Add Pull request template, the template can follow this:

Description

Brief description on what you want to fix or tackle with the PR

Models used to test - Optional

Small section including the models that you use to test.

Checklist

num_retries doesn't work with env_var

Hi,

I'm using env_var in my profile.yml file, but when trying to set the num_retires attribute with env_var it fails with the following error:

ERROR: Runtime Error
  Credentials in profile "my_profile", target "dev" invalid: '0' is not valid under any of the given schemas

To set the env var I ran this: export DBT_ATHENA_NUM_RETRIES=0

My profile.yml:

my_profile:
  outputs:
    dev:
      database: "{{ env_var('DBT_ATHENA_DB') }}"
      region_name: "{{ env_var('DBT_REGION_NAME') }}"
      s3_staging_dir: "{{ env_var('DBT_S3_STAGING_DIR') }}"
      schema: "{{ env_var('DBT_ATHENA_SCHEMA') }}"
      type: athena
      num_retries: "{{ env_var('DBT_ATHENA_NUM_RETRIES') }}"
  target: dev

A few notes:

  • When using the 0 directly it works fine: num_retries: 0
  • The other attributes are assigned as expected using the env_var

Different table creation strategies for Iceberg

Athena engine v3 supports CTAS for Iceberg.

Since engine v2 does not support that, what about adding a table_strategy in the table materialization with the following alternatives : tmp_table and ctas.

  • ctas is by default for all non iceberg tables and the only possibility.
  • tmp_table must be set for engine v2 for Iceberg tables.
  • ctas or tmp_table can be chosen for engine v3 for Iceberg tables.

What do you think ?

Issues when using post_hooks with incremental materialization

When using post_hooks to an incremental model seems that we are having some issues.

{{ config(
    materialized='incremental',
    format='iceberg',
    incremental_strategy='merge',
    unique_key='user_id',
    partitioned_by=['bucket(5, user_id)', 'status'],
    schema='silver',
    table_properties={
    	'optimize_rewrite_delete_file_threshold': '2'
    },
    post_hook = [
        "ALTER TABLE {{this}} SET TBLPROPERTIES ('vacuum_max_snapshot_age_seconds'='86400');"
    ]
) }}

with data as (
SELECT
	'a' AS user_id,
	'pi_updated' AS name,
	'inactive' AS status,
	17.89 AS cost,
	1 AS quantity,
	100000000 AS quantity_big,
	current_date AS my_date,
	cast(REPLACE(cast(current_timestamp as varchar), ' UTC', '') as timestamp(3)) as now
UNION ALL
SELECT
	'b' AS user_id,
	'beta_updated_yet_another one' AS name,
	'inactive' AS status,
	3 AS cost,
	50 AS quantity,
	100000000 AS quantity_big,
	current_date AS my_date,
	cast(REPLACE(cast(current_timestamp as varchar), ' UTC', '') as timestamp(3)) as now
)

select *
from data

produce this error

Completed with 1 error and 0 warnings:
08:07:30  
08:07:30  Runtime Error in model iceberg_increment (models/iceberg/iceberg_increment.sql)
08:07:30    line 1:243: no viable alternative at input '<EOF>'

looking at the query produced I see this

-- /* {"app": "dbt", "dbt_version": "1.3.0", "profile_name": "athena", "target_name": "dev", "node_id": "model.lakehouse.iceberg_increment"} */ ALTER TABLE silver.iceberg_increment SET TBLPROPERTIES ('vacuum_max_snapshot_age_seconds'='86400')

somehow we have double comments.

dbt init returns "No adapters available"

Hi there, I've done a pip install dbt-athena-community and I'm unable to init a new project. I'm getting this output.

23:38:46  Running with dbt=1.3.1
No adapters available. Go to https://docs.getdbt.com/docs/available-adapters`

I've tried on another machine and running the pip install pointed at the repo with no luck. I've verified that installing other adapters successfully get detected by dbt.

Vacuum with iceberg not supported

When running a post-hook with vacuum in iceberg table, I'm getting this error:

botocore.errorfactory.InvalidRequestException: An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: line 3:9: mismatched input 'vacuum'. Expecting: 'ALTER', 'ANALYZE', 'CALL', 'COMMENT', 'COMMIT', 'CREATE', 'DEALLOCATE', 'DELETE', 'DENY', 'DESC', 'DESCRIBE', 'DROP', 'EXECUTE', 'EXPLAIN', 'GRANT', 'INSERT', 'MERGE', 'PREPARE', 'REFRESH', 'RESET', 'REVOKE', 'ROLLBACK', 'SET', 'SHOW', 'START', 'TRUNCATE', 'UNLOAD', 'UPDATE', 'USE', 'USING', <query>

~Having a look at the error seems that the issue is on boto3 side~~
Running this

cursor = connect(
    work_group='athena_v3',
    s3_staging_dir="s3://my_bucket/py_athena/",).cursor()
data = cursor.execute("vacuum silver.iceberg_increment")
print(cursor)

works from the same project of dbt-athena, so there is some extra validation, that is preventing us to use vacuuming.

[bug] v1.3.4: Invalid bucket name ""

Description

On dbt-athena-community==1.3.4, when running something like dbt run -s +my_model, the run fails with error messages of this kind:

11:57:16  Parameter validation failed:
11:57:16  Invalid bucket name "": Bucket name must match the regex "^[a-zA-Z0-9.\-_]{1,255}$" or be an ARN matching the regex "^arn:(aws).*:(s3|s3-object-lambda):[a-z\-0-9]*:[0-9]{12}:accesspoint[/:][a-zA-Z0-9\-.]{1,63}$|^arn:(aws).*:s3-outposts:[a-z\-0-9]+:[0-9]{12}:outpost[/:][a-zA-Z0-9\-]{1,63}[/:]accesspoint[/:][a-zA-Z0-9\-]{1,63}$"

So it seems that the bucket location is set to an empty string.

When switching to dbt-athena-community==1.3.3 and keeping everything else as is, the same command succeeds without any errors.

This issue can be observed on both Athena v2 and v3.

This is my profiles.yml:

athena:
  outputs:
    dev:
      type: athena
      s3_staging_dir: s3://aws-athena-query-results-eu-central-1-9999999999999/dbt/
      s3_data_dir: s3://aws-athena-tables-data/dbt/bumblebee/dev
      s3_data_naming: schema_table_unique
      region_name: eu-central-1
      database: awsdatacatalog
      schema: dbt_dev_philipp
      work_group: whatever
      num_retries: 1
      threads: 8

Use glue APIs instead of information schema

Based on #4 (comment)

Implementation should be covered here northvolt/dbt-athena@49c7111

Using Glue API is better as faster and should allow multi-threading.

TODO

  • Use glue api instead of information schema
  • Use delete table from glue api - consider to do for Iceberg as well, as timeout lead to not having full deleted data. Before dropping pick the location from glue catalog.

Compatibility with Athena Engine v3

๐Ÿ‘‹๐Ÿป Hello dbt-athena squad

For now the adapter uses the version 2 of Athena Engine, according to README. On 2022.10.13, Athena release the V3 of their Engine, reducing the gap between Athena and Trino features.

I don't know what we want to do about this :

  1. Does the adapter only support the engine v2 ?
  2. Since we are responsible of the SQL we are creating and we can configure the workgroup, is the engine version really limited in the adapter ?

FYI there are the breaking changes

Multi-threading issues on information_schema queries

I'm switching a few projects to this dbt-athena community adapter and I notice a regression bug ๐Ÿ›

Locally, I use threads: 4. I have a few databases defined in my dbt_project.yml:

models:
  test_jesse:
    silver:
      test_a:
        +schema: silver_test_a
      test_b:
        +schema: silver_test_b
      test_c:
        +schema: silver_test_c

When I run dbt --debug run I see it starts by making 3 parallel queries to INFORMATION_SCHEMA (corresponding to the 3 custom schemas)

Output of the failing `dbt --debug run`:
23:22:00.639968 [info ] [MainThread]: Found 79 models, 9 tests, 0 snapshots, 0 analyses, 498 macros, 0 operations, 0 seed files, 64 sources, 0 exposures, 0 metrics
23:22:00.642584 [info ] [MainThread]:
23:22:00.642973 [debug] [MainThread]: Acquiring new athena connection "master"
23:22:00.645492 [debug] [ThreadPool]: Acquiring new athena connection "list_awsdatacatalog"
23:22:00.652730 [debug] [ThreadPool]: Acquiring new athena connection "list_awsdatacatalog"
23:22:00.654241 [debug] [ThreadPool]: Acquiring new athena connection "list_awsdatacatalog"
23:22:00.655511 [debug] [ThreadPool]: Using athena connection "list_awsdatacatalog"
23:22:00.655686 [debug] [ThreadPool]: Using athena connection "list_awsdatacatalog"
23:22:00.655861 [debug] [ThreadPool]: Using athena connection "list_awsdatacatalog"
23:22:00.656061 [debug] [ThreadPool]: On list_awsdatacatalog: /* {"app": "dbt", "dbt_version": "1.3.1", "profile_name": "athena", "target_name": "dev", "connection_name": "list_awsdatacatalog"} */

    select
        distinct schema_name

    from awsdatacatalog.INFORMATION_SCHEMA.schemata

23:22:00.656238 [debug] [ThreadPool]: On list_awsdatacatalog: /* {"app": "dbt", "dbt_version": "1.3.1", "profile_name": "athena", "target_name": "dev", "connection_name": "list_awsdatacatalog"} */

    select
        distinct schema_name

    from awsdatacatalog.INFORMATION_SCHEMA.schemata

23:22:00.656446 [debug] [ThreadPool]: On list_awsdatacatalog: /* {"app": "dbt", "dbt_version": "1.3.1", "profile_name": "athena", "target_name": "dev", "connection_name": "list_awsdatacatalog"} */

    select
        distinct schema_name

    from awsdatacatalog.INFORMATION_SCHEMA.schemata

23:22:00.656774 [debug] [ThreadPool]: Opening a new connection, currently in state init
23:22:00.660507 [debug] [ThreadPool]: Opening a new connection, currently in state init
23:22:00.673268 [debug] [ThreadPool]: Opening a new connection, currently in state init
23:22:00.675392 [error] [ThreadPool]: Athena adapter: Got an error when attempting to open a Athena connection due to 'credential_provider'
Traceback (most recent call last):
  File "/opt/homebrew/lib/python3.9/site-packages/dbt/adapters/athena/connections.py", line 155, in open
    handle = AthenaConnection(
  File "/opt/homebrew/lib/python3.9/site-packages/pyathena/connection.py", line 139, in __init__
    self._client = self._session.client(
  File "/opt/homebrew/lib/python3.9/site-packages/boto3/session.py", line 299, in client
    return self._session.create_client(
  File "/opt/homebrew/lib/python3.9/site-packages/botocore/session.py", line 951, in create_client
    credentials = self.get_credentials()
  File "/opt/homebrew/lib/python3.9/site-packages/botocore/session.py", line 507, in get_credentials
    self._credentials = self._components.get_component(
  File "/opt/homebrew/lib/python3.9/site-packages/botocore/session.py", line 1112, in get_component
    del self._deferred[name]
KeyError: 'credential_provider'
23:22:00.688091 [debug] [ThreadPool]: Athena adapter: Error running SQL: /* {"app": "dbt", "dbt_version": "1.3.1", "profile_name": "athena", "target_name": "dev", "connection_name": "list_awsdatacatalog"} */

    select
        distinct schema_name

    from awsdatacatalog.INFORMATION_SCHEMA.schemata

23:22:00.688642 [debug] [ThreadPool]: Athena adapter: Error running SQL: macro list_schemas
23:22:00.688916 [debug] [ThreadPool]: On list_awsdatacatalog: No close available on handle
23:22:07.156437 [debug] [ThreadPool]: SQL status: OK -1 in 6.5 seconds
23:22:07.158140 [debug] [ThreadPool]: On list_awsdatacatalog: Close
23:22:07.159025 [debug] [ThreadPool]: SQL status: OK -1 in 6.5 seconds
23:22:07.159950 [debug] [ThreadPool]: On list_awsdatacatalog: Close
23:22:07.160520 [debug] [MainThread]: Connection 'master' was properly closed.
23:22:07.160681 [debug] [MainThread]: Connection 'list_awsdatacatalog' was properly closed.
23:22:07.160892 [debug] [MainThread]: Connection 'list_awsdatacatalog' was properly closed.
23:22:07.161071 [debug] [MainThread]: Connection 'list_awsdatacatalog' was properly closed.
23:22:07.161847 [debug] [MainThread]: Flushing usage events
23:22:07.162046 [error] [MainThread]: Encountered an error:
Runtime Error
  Runtime Error
    Database Error
      'credential_provider'

When I switch back to Tomme's adapter (dbt-athena-adapter==1.0.1), It makes three queries in parallel successfully.

Output of the failing `dbt --debug run`:
23:23:51.801549 [info ] [MainThread]: Found 79 models, 9 tests, 0 snapshots, 0 analyses, 490 macros, 0 operations, 0 seed files, 64 sources, 0 exposures, 0 metrics
23:23:51.804227 [info ] [MainThread]:
23:23:51.804618 [debug] [MainThread]: Acquiring new athena connection "master"
23:23:51.807446 [debug] [ThreadPool]: Acquiring new athena connection "list_awsdatacatalog"
23:23:51.814927 [debug] [ThreadPool]: Acquiring new athena connection "list_awsdatacatalog"
23:23:51.816278 [debug] [ThreadPool]: Acquiring new athena connection "list_awsdatacatalog"
23:23:51.817486 [debug] [ThreadPool]: Using athena connection "list_awsdatacatalog"
23:23:51.817641 [debug] [ThreadPool]: Using athena connection "list_awsdatacatalog"
23:23:51.817786 [debug] [ThreadPool]: Using athena connection "list_awsdatacatalog"
23:23:51.817940 [debug] [ThreadPool]: On list_awsdatacatalog: -- /* {"app": "dbt", "dbt_version": "1.3.1", "profile_name": "athena", "target_name": "dev", "connection_name": "list_awsdatacatalog"} */

    select
        distinct schema_name

    from awsdatacatalog.INFORMATION_SCHEMA.schemata

23:23:51.818153 [debug] [ThreadPool]: On list_awsdatacatalog: -- /* {"app": "dbt", "dbt_version": "1.3.1", "profile_name": "athena", "target_name": "dev", "connection_name": "list_awsdatacatalog"} */

    select
        distinct schema_name

    from awsdatacatalog.INFORMATION_SCHEMA.schemata

23:23:51.818459 [debug] [ThreadPool]: On list_awsdatacatalog: -- /* {"app": "dbt", "dbt_version": "1.3.1", "profile_name": "athena", "target_name": "dev", "connection_name": "list_awsdatacatalog"} */

    select
        distinct schema_name

    from awsdatacatalog.INFORMATION_SCHEMA.schemata

23:23:51.818715 [debug] [ThreadPool]: Opening a new connection, currently in state init
23:23:51.828140 [debug] [ThreadPool]: Opening a new connection, currently in state init
23:23:51.832063 [debug] [ThreadPool]: Opening a new connection, currently in state init
23:23:58.460654 [debug] [ThreadPool]: SQL status: OK -1 in 6.64 seconds
23:23:58.461749 [debug] [ThreadPool]: On list_awsdatacatalog: Close
23:23:58.465697 [debug] [ThreadPool]: SQL status: OK -1 in 6.65 seconds
23:23:58.466534 [debug] [ThreadPool]: On list_awsdatacatalog: Close
23:23:58.473755 [debug] [ThreadPool]: SQL status: OK -1 in 6.66 seconds
23:23:58.474907 [debug] [ThreadPool]: On list_awsdatacatalog: Close

When I use threads: 1 or threads: 2 it works and successfully deploys the dbt project, but it starts failing with 3 or more threads.

This might be a regression? I found a similar issue Tomme/dbt-athena#41

Iceberg table support

Hi,

I'm looking at the Iceberg tables support plan:

As iceberg doesn't support CTA, the implementation do the following:

  • create tmp table as parquet
  • drop the old table, if exist
  • create iceberg table based on tmp table definition, only metadata
  • insert into from tmp
  • drop tmp table

I think that the tmp table creation is going to kill performance (I have 10s of billions of rows).

How about this approach:

  1. create the tmp table with the model's SELECT with "LIMIT 0" appended
  2. create iceberg table based on tmp table definition, only metadata
  3. insert into the iceberg table with the model's SQL (don't append the LIMIT this time)
  4. drop the tmp table

This would save one copying of all rows (hours of processing in my case).

Regards,

ZD

Governance

First I'd like to open with that it's great that someone has taken the initiative to fork and revive this adapter under an organization. Hopefully this can sustain active maintenance and all of us avoid multiple abandoned forks.

@nicor88 requested me to submit my quoting support PR from the old repo here which I have done, this got me thinking about the governance of this project. As it stands there are no public members of this organization, there is no clear contribution guidelines or any code of conduct.

If this is to become a healthy project that people will adopt instead of their own forks I think there are some things that would be good to address:

  • Who owns this org?
  • Who are part of the org? Are all from the same company, is there a risk that this will be abandoned if you stop using Athena?
  • Can one become a member?
  • How are decisions about the future of the project made?
  • What is expected of a contributor? I think that clear contribution guidelines and a code of conduct would be good to have.

User agent string grows with each call of pyathena

The root cause of the issue below is caused by pyathena.

After building a bunch of models the header gets to big so that s3 listObjects api call fails. It is caused by an user-agent header that grows with every invocation of Pyathena.

So far I just hotfixed it by pinning the version of pyathena to <2.8.0

Where the config is created in dbt-athena and pyathena

Example request after some models are built:
2022-12-27 15:18:27,965 botocore.endpoint [DEBUG] Sending http request: <AWSPreparedRequest stream_output=False, method=GET, url=https://*****.s3.eu-central-1.amazonaws.com/?prefix=datalakesensitive%2Foutput%2Fqueryresults%2Ftables%2Fbc32f394-cfc2-4e9e-b3dd-f67e09599921%2F&encoding-type=url, headers={'User-Agent': b'Boto3/1.26.37 Python/3.8.15 Linux/5.10.102.1-microsoft-standard-WSL2 Botocore/1.29.37 PyAthena/2.18.1 PyAthena/2.18.1 PyAthena/2.18.1 PyAthena/2.18.1 PyAthena/2.18.1 PyAthena/2.18.1 PyAthena/2.18.1 PyAthena/2.18.1 PyAthena/2.18.1 PyAthena/2.18.1 PyAthena/2.18.1 PyAthena/2.18.1 PyAthena/2.18.1 PyAthena/2.18.1 PyAthena/2.18.1 PyAthena/2.18.1 PyAthena/2.18.1 PyAthena/2.18.1 PyAthena/2.18.1 PyAthena/2.18.1 PyAthena/2.18.1 dbt-athena-community/1.3.3', 'X-Amz-Date': b'20221227T141827Z', 'X-Amz-Security-Token': b'****', 'X-Amz-Content-SHA256': b'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', 'Authorization': b'****', 'amz-sdk-invocation-id': b'****', 'amz-sdk-request': b'attempt=1'}>

#############################
Initial problem:
#############################

It is caused by a boto3 call in the cleanup_table function of the adapter.

In our project with about 300 dbt models it failed when we build the full project. When we build it partially it is OK for some selectors and for others it failed.

The boto3 call:
s3_bucket.objects.filter(Prefix=prefix).delete()

Here is the actual exception:
botocore.exceptions.ClientError: An error occurred (RequestHeaderSectionTooLarge) when calling the ListObjects operation: Your request header section exceeds the maximum allowed size.

Here the full stack trace:

[08:52:16.828249 [debug] [Thread-9  ]: Began running node model.climatepartner.stg_fpm__base_project_reservation_account
[08:52:16.839619 [info ] [Thread-9  ]: 392 of 573 START sql table model dev_datalakesensitive_fabi.stg_fpm__base_project_reservation_account  [RUN]
[08:52:16.854983 [debug] [Thread-9  ]: Acquiring new athena connection "model.climatepartner.stg_fpm__base_project_reservation_account"
[08:52:16.875331 [debug] [Thread-9  ]: Began compiling node model.climatepartner.stg_fpm__base_project_reservation_account
[08:52:16.884906 [debug] [Thread-9  ]: Compiling model.climatepartner.stg_fpm__base_project_reservation_account
[08:52:16.916737 [debug] [Thread-9  ]: Writing injected SQL for node "model.climatepartner.stg_fpm__base_project_reservation_account"
[08:52:33.177364 [debug] [Thread-9  ]: Athena adapter: Deleting table data from 's3://bucket-name/datalakesensitive/output/tables/dev_datalakesensitive_fabi/stg_fpm__base_project_reservation_account/ee9fab99-03d0-44eb-964a-47e71e0aa5d1/'
[08:52:34.331420 [debug] [Thread-9  ]: finished collecting timing info
[08:52:34.331870 [debug] [Thread-9  ]: On model.climatepartner.stg_fpm__base_project_reservation_account: Close
[08:52:34.332278 [error] [Thread-9  ]: Unhandled error while executing model.climatepartner.stg_fpm__base_project_reservation_account
An error occurred (RequestHeaderSectionTooLarge) when calling the ListObjects operation: Your request header section exceeds the maximum allowed size.
[08:52:34.332610 [debug] [Thread-9  ]: 
Traceback (most recent call last):
  File "/home/fabi/workspace/bi-models/.venv/lib/python3.8/site-packages/dbt/task/base.py", line 385, in safe_run
    result = self.compile_and_execute(manifest, ctx)
  File "/home/fabi/workspace/bi-models/.venv/lib/python3.8/site-packages/dbt/task/base.py", line 338, in compile_and_execute
    result = self.run(ctx.node, manifest)
  File "/home/fabi/workspace/bi-models/.venv/lib/python3.8/site-packages/dbt/task/base.py", line 429, in run
    return self.execute(compiled_node, manifest)
  File "/home/fabi/workspace/bi-models/.venv/lib/python3.8/site-packages/dbt/task/run.py", line 281, in execute
    result = MacroGenerator(
  File "/home/fabi/workspace/bi-models/.venv/lib/python3.8/site-packages/dbt/clients/jinja.py", line 326, in __call__
    return self.call_macro(*args, **kwargs)
  File "/home/fabi/workspace/bi-models/.venv/lib/python3.8/site-packages/dbt/clients/jinja.py", line 253, in call_macro
    return macro(*args, **kwargs)
  File "/home/fabi/workspace/bi-models/.venv/lib/python3.8/site-packages/jinja2/runtime.py", line 763, in __call__
    return self._invoke(arguments, autoescape)
  File "/home/fabi/workspace/bi-models/.venv/lib/python3.8/site-packages/jinja2/runtime.py", line 777, in _invoke
    rv = self._func(*arguments)
  File "<template>", line 49, in macro
  File "/home/fabi/workspace/bi-models/.venv/lib/python3.8/site-packages/jinja2/sandbox.py", line 393, in call
    return __context.call(__obj, *args, **kwargs)
  File "/home/fabi/workspace/bi-models/.venv/lib/python3.8/site-packages/jinja2/runtime.py", line 298, in call
    return __obj(*args, **kwargs)
  File "/home/fabi/workspace/bi-models/.venv/lib/python3.8/site-packages/dbt/clients/jinja.py", line 326, in __call__
    return self.call_macro(*args, **kwargs)
  File "/home/fabi/workspace/bi-models/.venv/lib/python3.8/site-packages/dbt/clients/jinja.py", line 253, in call_macro
    return macro(*args, **kwargs)
  File "/home/fabi/workspace/bi-models/.venv/lib/python3.8/site-packages/jinja2/runtime.py", line 763, in __call__
    return self._invoke(arguments, autoescape)
  File "/home/fabi/workspace/bi-models/.venv/lib/python3.8/site-packages/jinja2/runtime.py", line 777, in _invoke
    rv = self._func(*arguments)
  File "<template>", line 22, in macro
  File "/home/fabi/workspace/bi-models/.venv/lib/python3.8/site-packages/jinja2/sandbox.py", line 393, in call
    return __context.call(__obj, *args, **kwargs)
  File "/home/fabi/workspace/bi-models/.venv/lib/python3.8/site-packages/jinja2/runtime.py", line 298, in call
    return __obj(*args, **kwargs)
  File "/home/fabi/workspace/bi-models/.venv/lib/python3.8/site-packages/dbt/adapters/athena/impl.py", line 163, in clean_up_table
    s3_bucket.objects.filter(Prefix=prefix).delete()
  File "/home/fabi/workspace/bi-models/.venv/lib/python3.8/site-packages/boto3/resources/collection.py", line 561, in batch_action
    return action(self, *args, **kwargs)
  File "/home/fabi/workspace/bi-models/.venv/lib/python3.8/site-packages/boto3/resources/action.py", line 134, in __call__
    for page in parent.pages():
  File "/home/fabi/workspace/bi-models/.venv/lib/python3.8/site-packages/boto3/resources/collection.py", line 171, in pages
    for page in pages:
  File "/home/fabi/workspace/bi-models/.venv/lib/python3.8/site-packages/botocore/paginate.py", line 269, in __iter__
    response = self._make_request(current_kwargs)
  File "/home/fabi/workspace/bi-models/.venv/lib/python3.8/site-packages/botocore/paginate.py", line 357, in _make_request
    return self._method(**current_kwargs)
  File "/home/fabi/workspace/bi-models/.venv/lib/python3.8/site-packages/botocore/client.py", line 530, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/home/fabi/workspace/bi-models/.venv/lib/python3.8/site-packages/botocore/client.py", line 960, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (RequestHeaderSectionTooLarge) when calling the ListObjects operation: Your request header section exceeds the maximum allowed size.

[08:52:34.422051 [error] [Thread-9  ]: 392 of 573 ERROR creating sql table model dev_datalakesensitive_fabi.stg_fpm__base_project_reservation_account  [ERROR in 17.50s]

Dynamic `config.external_location` setting does not play well with `state:modified`

Summary

The external_location config setting is respected during state comparison when running dbt with --state ... -s state:modified option.
This is troublesome in CI scenarios where we compare a development branch to the manifest.json of the production target.
As a consequence, dbt considers all models modified which make use of the external_location setting in a target-specific manner.

Expected behavior

The model should not be considered as being changed, if only the dynamically set external_location differs.

We use many other target-specific settings, like, e.g. custom schemas or databases and all of that play nicely with state:modified. So, the observed behavior is unexpected in my eyes.

How to reproduce

  • Have at least 2 targets defined, in this example we use dev and prod.
  • Set your model's config to something like
{{
    config(
        materialized='table'
        , external_location='s3://whatever/' ~ target.name
    )
}}
  • Compile the project for one of the two targets: dbt compile --target prod
  • Check which models dbt considers changed when using the other target: dbt ls --state target -s state:modified --target dev

Then this model will be shown as modified.

Use case

For our PII data we make use of the external_location setting in order to ensure that this data is written to a different S3 bucket, so we can make use of a bucket-specific access control policy.

A typical model config then looks as follows:

{{
    config(
        materialized='table'
        , external_location=get_pii_external_location_path()
        , tags=['daily', 'pii']
    )
}}

And the corresponding get_pii_external_location_path() is defined as:

{%- macro get_pii_external_location_path() -%}
    {#
        This macro is used for overriding the `external_location` config setting
        of PII models.
        Each model in the PII directory/schema must make use of it, in order
        to ensure that the model's data will get written to the PII S3 bucket.
    -#}

    {{ 's3://aws-athena-tables-data-pii/dbt/' ~ target.name ~ '/' ~ env_var('DBT_SCHEMA_NAME', this.schema) ~ '/' ~ this.name ~ '/' ~ run_started_at.isoformat() ~ '/' ~ invocation_id }}

{%- endmacro -%}

So we always use the same designated PII bucket but tweak the paths depending on the target.name and an environment variable DBT_SCHEMA_NAME, so concurrent CI runs don't clash.

If we now compile the project for --target prod and then compare what would be run in --target dev, we get all the PII models detected as changed:

dbt ls --state ./target -s state:modified --target dev

upgrade to support dbt-core v1.4.0

Background

The latest version of dbt Core,dbt-core==1.4.0, was published on January 25, 2023 (PyPI | Github). In fact, a patch, dbt-core==1.4.1 (PyPI | Github), was also released on the same day.

How to upgrade

dbt-labs/dbt-core#6624 is an open discussion with more detailed information. If you have questions, please put them there! dbt-labs/dbt-core#6849 is for keeping track of the community's progress on releasing 1.4.0

The above linked guide has more information, but below is a high-level checklist of work that would enable a successful 1.4.0 release of your adapter.

  • support Python 3.11 (only if your adapter's dependencies allow)
  • Consolidate timestamp functions & macros
  • Replace deprecated exception functions
  • Add support for more tests

the next minor release: 1.5.0

FYI, dbt-core==1.5.0 is expected to be released at the end of April. Please plan on allocating a more effort to upgrade support compared to previous minor versions. Expect to hear more in the middle of April.

At a high-level expect much greater adapter test coverage (a very good thing!), and some likely heaving renaming and restructuring as the API-ification of dbt-core is now well underway. See https://github.com/dbt-labs/dbt-core/milestone/82 for more information.

Separate table type from table format

๐Ÿ‘‹๐Ÿป Hello

Right now, when we use our adapter we can set format to parquet, iceberg, json ... The issue with that is that with the new implementation of create_table_as we are mixing table_type and table_format and we enforce that iceberg tables are in parquet format. This is ok right now because parquet is the best format to put with Iceberg but it would be great to have 2 parameters when defining models :

  • table_type : format or iceberg
  • format : parquet, avro, json, text

What do you think ?

Implement linting via github actions

I read the call for help in slack and wanted to ask what kind of linting you want?

My suggestion (based on what I usually implemented in python repos and actually nowadays like):

  • black, isort for code formatting
  • flake8 + mypy for finding code problems
  • pre-commit for running all the linter on commits or on demand (for faster cycle times as you can run the same stuff as in CI), including some linters for whitespace/EOF/... thingies, see e.g. the wolt cockiecutter example)
  • Github actions to run all pre-commit defined linters on all files

I usually add a small makefile to run the linter/formatter locally on demand (instead of using pre-commit as git pre-commit hook) and setup the local dev virtualenv, either via pip from requirements.txt or via poetry/pyproject.toml, which seems to be the new standard in python packaging (my preference would be the latter... requirements.txt can still be autogenerated).

Not sure what to use for tests, usually it's pytest, but I have no idea how dbt adapters are usually tested. -> leave it out for now?

Would that meet your needs and is this what you had in mind?

Invalid incremental strategy provided: None

I noticed this in a couple of my projects already, after switching to dbt 1.3.0 and 1.3.1

Given a simple model with this config block:

{{
    config(materialized='incremental')
}}

When I deploy this model using dbt, it throws:

23:09:13.423010 [error] [MainThread]: Compilation Error in model event_login (models/silver/content/XXX.sql)
23:09:13.423138 [error] [MainThread]:   Invalid incremental strategy provided: None
23:09:13.423268 [error] [MainThread]:       Expected one of: 'append', 'insert_overwrite'
23:09:13.423403 [error] [MainThread]:   
23:09:13.423529 [error] [MainThread]:   > in macro validate_get_incremental_strategy (macros/materializations/models/incremental/helpers.sql)
23:09:13.423653 [error] [MainThread]:   > called by macro materialization_incremental_athena (macros/materializations/models/incremental/incremental.sql)
23:09:13.423777 [error] [MainThread]:   > called by model event_login (models/silver/content/XXX.sql)

I can reproduce in both the Tomme athena adapter and this new dbt-athena adapter, when I'm using dbt 1.3.1.
Therefore, it seems like something that was introduced with dbt 1.3? When I switch to dbt-core==1.2.1 it works fine. However, I can't find something related in the dbt 1.3.0 or 1.3.1 release changelog...

It's weird because we set insert_overwrite as a default here:

{% set raw_strategy = config.get('incremental_strategy', default='insert_overwrite') %}
{% set strategy = validate_get_incremental_strategy(raw_strategy) %}

And then we call validate_get_incremental_strategy which throws the error.

When I debug the incremental.sql file:

{% set raw_strategy = config.get('incremental_strategy', default='insert_overwrite') %}
{{ log("RAW STRATEGY: " ~ raw_strategy, True) }}
{% set strategy = validate_get_incremental_strategy(raw_strategy) %}

Then the console prints RAW STRATEGY: None when I run dbt. Somehow the default is not being applied since dbt 1.3?!

The quick fix is to add an explicit incremental_strategy='insert_overwrite' in the model config block. However, I'd like to highlight this issue because the default stopped working in this macro... ๐Ÿ™

Issue on consecutive run on incremental model

The first run works, when I run for the second time the model get stuck. No error raised.

{{ config(
    materialized='incremental',
    format='parquet',
    incremental_strategy='insert_overwrite',
    partitioned_by=['report_date']
) }}

SELECT
    999999 AS cost,
    cast(t.ts AS date) as report_date
FROM unnest(sequence(cast('2022-10-01' AS date), cast('2022-12-04' AS date), INTERVAL '1' Day)) AS t(ts)

Partition Limitations

Quick question, wasn't sure where else to post this. The readme mentions that the plugin only supports the creation of 100 partitions. Is that 100 partition values (i.e. partitioning by date, you can only create 100 days worth of partitions) or partition fields (i.e. I can partition by 100 fields, field1 -> field100)?

Getting random "mismatched input 'cascade'." during dbt test

First of all: Thank you for your amazing contribution and bringing Athena engine v3 features to the DBT community!

I am currently tracking a bug that seems to occur non-deterministically. Essentially, during dbt test, I get syntax errors like these:

botocore.errorfactory.InvalidRequestException: An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: line 1:100: mismatched input 'cascade'. Expecting: '.', <EOF

It happens during the initial cleanup phase when DBT tries to delete existing test tables. To me, it looks like sometimes, the macro drop_relation from this repo is not found and DBT falls back to using default__drop_relation, which does a

drop {{ relation.type }} if exists {{ relation }} cascade

Below is the relevant log output of such an error:

-- /* {"app": "dbt", "dbt_version": "1.3.1", "profile_name": "athena", "target_name": "prelive", "node_id": "test.data_importer.not_null_bestaende_cleaned_lagerbestand.d2103837db"} */
12:21:35.254829 [debug] [Thread-7 (]: Opening a new connection, currently in state closed
Failed to execute query.
Traceback (most recent call last):
  File "/opt/pysetup/.venv/lib/python3.10/site-packages/pyathena/common.py", line 494, in _execute
    query_id = retry_api_call(
  File "/opt/pysetup/.venv/lib/python3.10/site-packages/pyathena/util.py", line 68, in retry_api_call
    return retry(func, *args, **kwargs)
  File "/opt/pysetup/.venv/lib/python3.10/site-packages/tenacity/__init__.py", line 406, in __call__
    do = self.iter(retry_state=retry_state)
  File "/opt/pysetup/.venv/lib/python3.10/site-packages/tenacity/__init__.py", line 351, in iter
    return fut.result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/opt/pysetup/.venv/lib/python3.10/site-packages/tenacity/__init__.py", line 409, in __call__
    result = fn(*args, **kwargs)
  File "/opt/pysetup/.venv/lib/python3.10/site-packages/botocore/client.py", line 530, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/opt/pysetup/.venv/lib/python3.10/site-packages/botocore/client.py", line 960, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.errorfactory.InvalidRequestException: An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: line 1:102: mismatched input 'cascade'. Expecting: '.', <EOF>
12:21:35.631219 [debug] [Thread-8 (]: Athena adapter: Error running SQL: drop table if exists dbt_test__audit.dbt_expectations_expect_column_93d8b305ae3a3af2a182dc0d9423d314 cascade
-- /* {"app": "dbt", "dbt_version": "1.3.1", "profile_name": "athena", "target_name": "prelive", "node_id": "test.data_importer.dbt_expectations_expect_column_value_lengths_to_equal_rueres_wm_ek__wm_ek_IS_NOT_NULL__5.bc16ec6d3f"} */
12:21:35.631863 [debug] [Thread-8 (]: Athena adapter: Error running SQL: macro drop_relation
12:21:35.632307 [debug] [Thread-8 (]: finished collecting timing info
12:21:35.632645 [debug] [Thread-8 (]: On test.data_importer.dbt_expectations_expect_column_value_lengths_to_equal_rueres_wm_ek__wm_ek_IS_NOT_NULL__5.bc16ec6d3f: Close
12:21:35.633406 [debug] [Thread-8 (]: Runtime Error in test dbt_expectations_expect_column_value_lengths_to_equal_rueres_wm_ek__wm_ek_IS_NOT_NULL__5 (models/host/schema.yml)

I had some success on my local machine by adding a simple Jinja comment in front of this line.

Is it possible that there is a race condition when registering the macro that leads to spurious errors when the macro is not yet registered when test tables are being deleted? I am not very familiar with the underlying process of how DBT adapters work together with DBT core, just guessing here.

Prune path before using a CTA

What

to avoid: HIVE_PATH_ALREADY_EXISTS, ensure that the path is pruned. Check implementation here, in our case we can build something similar.

Iceberg table should avoid to use that, as the table drop take care of pruning the location.

dbt_valid_to date should be set to udpated_at column date value

The dbt_valid_to date should be set to udpated_at column date value not the current timestamp.

Per the documentation: https://docs.getdbt.com/docs/build/snapshots#snapshot-meta-fields

WHEN dbt_valid_to=CAST('9999-01-01' as timestamp) AND is_current_record=True THEN {{ current_timestamp() }}

This is causing overlaps in our valid date ranges.
Here are the meta data columns from one id:
id updated_at dbt_valid_from dbt_valid_to is_current_record
47a555a1-1455-481a-8937-d3cc03f02362 2021-09-02 13:58:09.961000 2021-09-02 13:58:09.961000 2023-02-14 20:05:44.433000 false
47a555a1-1455-481a-8937-d3cc03f02362 2021-09-03 13:57:59.941000 2021-09-03 13:57:59.941000 9999-01-01 00:00:00.000000 true

This was implemented with the following pull request:
#111

Zero Downtime options for table materialization

See this PR: Tomme/dbt-athena#95

Also using iceberg should be quit easy:

ALTER TABLE target RENAME TO target_bkp;
ALTER TABLE target_tmp RENAME TO target;

DROP target_bkp; -- only if all is fine

It's not 100% downtime, but almost.

Regarding not iceberg table we can do something like this:

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.