Giter Site home page Giter Site logo

outrauk / dataiku-plugin-snowflake-hdfs Goto Github PK

View Code? Open in Web Editor NEW
0.0 3.0 0.0 75 KB

DSS plugin for fast loading between Snowflake and HDFS

Home Page: https://github.com/outrauk/dataiku-plugin-snowflake-hdfs

License: MIT License

Python 92.92% Makefile 1.18% Shell 5.90%
dataiku snowflake hdfs parquet

dataiku-plugin-snowflake-hdfs's Issues

Snowflake to HDFS syncs integers as decimals

If a table in Snowflake that stores integers has a type of NUMBER, it gets output to Parquet as decimal.

Snowflake tables with int or bigint columns get stored in SF a NUMBER, which then outputs to Parquet as int65 (DECIMAL(18,0)) and read back in to DSS as decimal.

message schema {
  optional binary misc (UTF8);
  optional binary event_type (UTF8);
  optional binary source (UTF8);
  optional int64 uprn (DECIMAL(18,0));
  optional int64 event_date (TIMESTAMP_MILLIS);
  optional int64 virtual_load_date (TIMESTAMP_MILLIS);
}

Fix: explicitly cast as bigint:

 COPY INTO '@PUBLIC.STAGE/event_hdfs/part'
 FROM (
     SELECT "misc", "event_type", "source", "uprn":bigint AS "uprn", "event_date", "virtual_load_date"
     FROM "DSSMANAGED"."EVENT_DATA_events_sf_mk"
 )
 FILE_FORMAT = (TYPE = PARQUET)
 OVERWRITE = TRUE
 HEADER = TRUE;

Also in AOBS-564

Timestamp problems

There is an issue with th e plugin when dates are not in NTZ format. Some casting may have to occur.
Message:

Error in Python process: At line 38: <class 'Exception'>: None: b'During query \'\nCOPY INTO \'@PUBLIC.OUTRA_DATA_DATAIKU_EMR_MANAGED_STAGE/projects/EVENT_DATA/events_hdfs/part\'\nFROM "DSSMANAGED"."EVENT_DATA_events_sf"\nFILE_FORMAT = (TYPE = PARQUET)\nOVERWRITE = TRUE\nHEADER = TRUE;\n \': Error encountered when unloading to PARQUET: TIMESTAMP_TZ and LTZ types are not supported for unloading to Parquet., caused by: SnowflakeSQLException: Error encountered when unloading to PARQUET: TIMESTAMP_TZ and LTZ types are not supported for unloading to Parquet.'

partitioning issue

When synching a partitioned dataset, the target snowflake dataset does not see the partitions propagated as columns.

Here is an example of error message on a dataset with one partition variable:
Error in Python process: At line 38: <class 'Exception'>: None: b'During query \'\nCOPY INTO "DSSMANAGED"."MOVEDATESPARK_forecast_deduped_sf"\nFROM (\n SELECT $1:"uprn", $1:"trigger", $1:"event_date", $1:"move", $1:"move_prob", $1:"predicted_date", $1:"forecast_confidence_pm0", $1:"forecast_confidence_pm1", $1:"forecast_confidence_pm2", $1:"forecast_confidence_pm3", $1:"date_weeks", $1:"move_type"\n FROM \'@PUBLIC.OUTRA_DATA_DATAIKU_EMR_MANAGED_STAGE/projects/MOVEDATESPARK/forecasts_deduped_hdfs/\'\n)\nFILE_FORMAT = (TYPE = PARQUET, SNAPPY_COMPRESSION = TRUE)\nPATTERN = \'.*\\.snappy\\.parquet\'\nFORCE = TRUE;\n \': SQL compilation error:\nInsert value list does not match column list expecting 13 but got 12, caused by: SnowflakeSQLException: SQL compilation error:\nInsert value list does not match column list expecting 13 but got 12'

sync_snowflake_to_hdfs recipe doesn't handle dates correctly

Snowflake tables with date and timestamp columns (with timezone or not), when synced back to Dataiku via sync_snowflake_to_hdfs, are imported as int and bigint, respectively. This appears to be an issue with how Dataiku reads Parquet files rather than the plugin itself. In particular, it appears DSS expects dates and times will only be stored using the legacy and now deprecated int96 representation. Its deprecation is described in PARQUET-323 and implemented in apache/parquet-format#86. Snowflake's Parquet implementation outputs dates as annotated int32 and timestamps as int64 (as LogicalTypes).

Additional information about dates, times, and Parquet is described in this article.

Snowflake's Parquet schema:

message schema {
  optional int32 PROPERTY_VALUE_DATE (DATE);
  optional int64 PROPERTY_VALUE_TIMESTAMP (TIMESTAMP_MILLIS);
  optional int64 PROPERTY_VALUE_TIMESTAMP_NTZ (TIMESTAMP_MILLIS);
}

DSS's Parquet schema:

message hive_schema {
  optional int96 PROPERTY_VALUE_DATE;
  optional int96 PROPERTY_VALUE_TIMESTAMP;
  optional int96 PROPERTY_VALUE_TIMESTAMP_NTZ;
}

Requested DSS Fix

Even if DSS continues to output int96, at least make sure the importer recognises and supports the more modern int32 and int64 representations.

Update: this is on Dataiku's backlog but not a high priority

Possible Workarounds

Under the current implementation, a user could create a Prepare recipe that, in Dataiku's words:

You can convert the int32 version to a date with a prepare recipe using 86400 * the_int32_column and parsing this as a Unix timestamp. The int64 is likely a Unix timestamp (in milliseconds) too that you can also parse with a prepare recipe.

A potential workaround would be to use PyArrow to read in Snowflake's parquet files and write out to new files using the use_deprecated_int96_timestamps flag on pyarrow.parquet.write_table.

We could also chose to convert any date/time columns to strings and then rely on users to add a Prepare recipe that parses it back to date time.

We could also fallback on CSV rather than Parquet, but that's undesirable for many reasons. (And defeats the whole point of this plugin...)

How to replicate

Prereqs:

  • A STAGE or other location to COPY INTO from Snowflake
  • parquet-tools (on macOS, brew install parquet-tools)

You can either use Snowflake and DSS to create example Parquet files, or use the two files in the attached
parquet_examples.zip
file:

  • part-r-00000.snappy.parquet -- file from Dataiku with int96
  • part_0_0_0.snappy.parquet -- file from Snowflake with int32 and int64

First, create a sample parquet file in Snowflake:

COPY INTO '@PUBLIC.MY_S3_STAGE/dss_bug/part'
FROM (
  SELECT 
    t.example_ts::DATE AS property_value_date, 
    t.example_ts::TIMESTAMP AS property_value_timestamp, 
    t.example_ts::TIMESTAMP_NTZ AS property_value_timestamp_ntz 
  FROM 
    (
      VALUES (GETDATE()), (DATEADD(HOUR,2,getdate())), (DATEADD(DAY,-3,GETDATE())) AS t (example_ts)
    )
)
FILE_FORMAT = (TYPE = PARQUET)
OVERWRITE = TRUE
HEADER = TRUE
;

Copy the file locally and inspect the schema using parquet-tools schema:

$ parquet-tools schema part_0_0_0.snappy.parquet
message schema {
  optional int32 PROPERTY_VALUE_DATE (DATE);
  optional int64 PROPERTY_VALUE_TIMESTAMP (TIMESTAMP_MILLIS);
  optional int64 PROPERTY_VALUE_TIMESTAMP_NTZ (TIMESTAMP_MILLIS);
}

Also, create a "Hadoop HDFS dataset" in Dataiku that points to this file. Note that the schema gets loaded as int for dates and bigint for timestamps:
image

NB: we've tried picking various flavours of Parquet (Hive, Pig, and Spark) and manually setting the schema's data type to date with no success.

Now, in Dataiku, create a dummy dataset with the same types. It can be Snowflake, or anything else. Example:
image

Add a Sync recipe with "Parquet" as the Format, and run it:
image

Copy the file locally and again run parquet-tools schema (this output also appears in the Activity Log of the sync execution):

$ parquet-tools schema part-r-00000.snappy.parquet
message hive_schema {
  optional int96 PROPERTY_VALUE_DATE;
  optional int96 PROPERTY_VALUE_TIMESTAMP;
  optional int96 PROPERTY_VALUE_TIMESTAMP_NTZ;
}

As you can see from the Parquet schema, Dataiku outputs using int96 for date values.

Type conversions from snowflake to HDFS

I have noticed some conversion problems with certain types when doing the bulk unload from snowflake to hdfs. Specifically:

  • Columns with type NUMBER (such as uprn, with 12 digits) convert to a DecimalType in parquet, instead of an IntegerType or LongType. This can be fixed by a cast to bigint in the SQL query:
COPY INTO @<managed_stage> FROM (
    SELECT "problematic_column"::bigint AS "problematic_column_fixed"
    FROM <table>)
FILE_FORMAT = (TYPE = PARQUET)
OVERWRITE = TRUE
HEADER = TRUE;

KeyError when Snowflake table has no schema

If a Snowflake table has no Schema (e.g., because it's in PUBLIC), the plugin fails with:

[2020/07/29-16:57:23.386] [null-err-102] [INFO] [dku.utils]  - *************** Recipe code failed **************
[2020/07/29-16:57:23.387] [null-err-102] [INFO] [dku.utils]  - Begin Python stack
[2020/07/29-16:57:23.387] [null-err-102] [INFO] [dku.utils]  - Traceback (most recent call last):
[2020/07/29-16:57:23.387] [null-err-102] [INFO] [dku.utils]  -   File "/mnt/secondary/dataiku_data/jobs/KLABERSB2/Build_dss_sf_date_test_hdfs_2020-07-29T16-57-12.834/compute_dss_sf_date_test_hdfs_NP/custom-python-recipe/pyouthmC6vTuQNYAr/python-exec-wrapper.py", line 194, in <module>
[2020/07/29-16:57:23.388] [null-err-102] [INFO] [dku.utils]  -     exec(f.read())
[2020/07/29-16:57:23.388] [null-err-102] [INFO] [dku.utils]  -   File "<string>", line 16, in <module>
[2020/07/29-16:57:23.388] [null-err-102] [INFO] [dku.utils]  -   File "/mnt/secondary/dataiku_data/plugins/dev/snowflake-hdfs/python-lib/config.py", line 54, in get_table_name
[2020/07/29-16:57:23.389] [null-err-102] [INFO] [dku.utils]  -     return f'"{params["schema"]}"."{params["table"]}"'.replace('${projectKey}', project_key)
[2020/07/29-16:57:23.389] [null-err-102] [INFO] [dku.utils]  - KeyError: 'schema'

This is because get_config()["params"] does not return a schema value:

{'connection': 'outra-sf-dataiku-db',
 'notReadyIfEmpty': False,
 'mode': 'table',
 'partitioningType': 'custom',
 'normalizeDoubles': True,
 'table': 'DSS_SF_DATE_TEST',
 'tableCreationMode': 'auto',
 'writeInsertBatchSize': 10000,
 'writeJDBCBadDataBehavior': 'DISCARD_ROW',
 'readColsWithUnknownTzAsDates': False,
 'readSQLDateColsAsDSSDates': True}

image

make_dss_pip does not follow redirects

make_dss_pip.sh breaks because Dataiku has changed the URL for the "latest" download. It can be fixed by changing the curl to follow redirects (-L) and should also check that it can find a version variable. Looks like this:

VERSION=$(curl -L -v --silent https://www.dataiku.com/dss/trynow/linux/ 2>&1 | grep -e 'var cur_version' | sed -n 's/^.*"\(.*\)".*$/\1/p')

if [ -z "$VERSION" ]; then
  echo "Can't find a DSS version"
  exit
fi

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.