outrauk / dataiku-plugin-snowflake-hdfs Goto Github PK
View Code? Open in Web Editor NEWDSS plugin for fast loading between Snowflake and HDFS
Home Page: https://github.com/outrauk/dataiku-plugin-snowflake-hdfs
License: MIT License
DSS plugin for fast loading between Snowflake and HDFS
Home Page: https://github.com/outrauk/dataiku-plugin-snowflake-hdfs
License: MIT License
If a table in Snowflake that stores integers has a type of NUMBER
, it gets output to Parquet as decimal.
Snowflake tables with int
or bigint
columns get stored in SF a NUMBER
, which then outputs to Parquet as int65 (DECIMAL(18,0))
and read back in to DSS as decimal.
message schema {
optional binary misc (UTF8);
optional binary event_type (UTF8);
optional binary source (UTF8);
optional int64 uprn (DECIMAL(18,0));
optional int64 event_date (TIMESTAMP_MILLIS);
optional int64 virtual_load_date (TIMESTAMP_MILLIS);
}
Fix: explicitly cast as bigint
:
COPY INTO '@PUBLIC.STAGE/event_hdfs/part'
FROM (
SELECT "misc", "event_type", "source", "uprn":bigint AS "uprn", "event_date", "virtual_load_date"
FROM "DSSMANAGED"."EVENT_DATA_events_sf_mk"
)
FILE_FORMAT = (TYPE = PARQUET)
OVERWRITE = TRUE
HEADER = TRUE;
Also in AOBS-564
There is an issue with th e plugin when dates are not in NTZ format. Some casting may have to occur.
Message:
Error in Python process: At line 38: <class 'Exception'>: None: b'During query \'\nCOPY INTO \'@PUBLIC.OUTRA_DATA_DATAIKU_EMR_MANAGED_STAGE/projects/EVENT_DATA/events_hdfs/part\'\nFROM "DSSMANAGED"."EVENT_DATA_events_sf"\nFILE_FORMAT = (TYPE = PARQUET)\nOVERWRITE = TRUE\nHEADER = TRUE;\n \': Error encountered when unloading to PARQUET: TIMESTAMP_TZ and LTZ types are not supported for unloading to Parquet., caused by: SnowflakeSQLException: Error encountered when unloading to PARQUET: TIMESTAMP_TZ and LTZ types are not supported for unloading to Parquet.'
When synching a partitioned dataset, the target snowflake dataset does not see the partitions propagated as columns.
Here is an example of error message on a dataset with one partition variable:
Error in Python process: At line 38: <class 'Exception'>: None: b'During query \'\nCOPY INTO "DSSMANAGED"."MOVEDATESPARK_forecast_deduped_sf"\nFROM (\n SELECT $1:"uprn", $1:"trigger", $1:"event_date", $1:"move", $1:"move_prob", $1:"predicted_date", $1:"forecast_confidence_pm0", $1:"forecast_confidence_pm1", $1:"forecast_confidence_pm2", $1:"forecast_confidence_pm3", $1:"date_weeks", $1:"move_type"\n FROM \'@PUBLIC.OUTRA_DATA_DATAIKU_EMR_MANAGED_STAGE/projects/MOVEDATESPARK/forecasts_deduped_hdfs/\'\n)\nFILE_FORMAT = (TYPE = PARQUET, SNAPPY_COMPRESSION = TRUE)\nPATTERN = \'.*\\.snappy\\.parquet\'\nFORCE = TRUE;\n \': SQL compilation error:\nInsert value list does not match column list expecting 13 but got 12, caused by: SnowflakeSQLException: SQL compilation error:\nInsert value list does not match column list expecting 13 but got 12'
Snowflake tables with date and timestamp columns (with timezone or not), when synced back to Dataiku via sync_snowflake_to_hdfs
, are imported as int
and bigint
, respectively. This appears to be an issue with how Dataiku reads Parquet files rather than the plugin itself. In particular, it appears DSS expects dates and times will only be stored using the legacy and now deprecated int96
representation. Its deprecation is described in PARQUET-323 and implemented in apache/parquet-format#86. Snowflake's Parquet implementation outputs dates as annotated int32
and timestamps as int64
(as LogicalTypes).
Additional information about dates, times, and Parquet is described in this article.
Snowflake's Parquet schema:
message schema {
optional int32 PROPERTY_VALUE_DATE (DATE);
optional int64 PROPERTY_VALUE_TIMESTAMP (TIMESTAMP_MILLIS);
optional int64 PROPERTY_VALUE_TIMESTAMP_NTZ (TIMESTAMP_MILLIS);
}
DSS's Parquet schema:
message hive_schema {
optional int96 PROPERTY_VALUE_DATE;
optional int96 PROPERTY_VALUE_TIMESTAMP;
optional int96 PROPERTY_VALUE_TIMESTAMP_NTZ;
}
Even if DSS continues to output int96
, at least make sure the importer recognises and supports the more modern int32
and int64
representations.
Update: this is on Dataiku's backlog but not a high priority
Under the current implementation, a user could create a Prepare recipe that, in Dataiku's words:
You can convert the int32 version to a date with a prepare recipe using 86400 * the_int32_column and parsing this as a Unix timestamp. The int64 is likely a Unix timestamp (in milliseconds) too that you can also parse with a prepare recipe.
A potential workaround would be to use PyArrow to read in Snowflake's parquet files and write out to new files using the use_deprecated_int96_timestamps
flag on pyarrow.parquet.write_table
.
We could also chose to convert any date/time columns to strings and then rely on users to add a Prepare recipe that parses it back to date time.
We could also fallback on CSV rather than Parquet, but that's undesirable for many reasons. (And defeats the whole point of this plugin...)
Prereqs:
STAGE
or other location to COPY INTO
from Snowflakeparquet-tools
(on macOS, brew install parquet-tools
)You can either use Snowflake and DSS to create example Parquet files, or use the two files in the attached
parquet_examples.zip
file:
part-r-00000.snappy.parquet
-- file from Dataiku with int96
part_0_0_0.snappy.parquet
-- file from Snowflake with int32
and int64
First, create a sample parquet file in Snowflake:
COPY INTO '@PUBLIC.MY_S3_STAGE/dss_bug/part'
FROM (
SELECT
t.example_ts::DATE AS property_value_date,
t.example_ts::TIMESTAMP AS property_value_timestamp,
t.example_ts::TIMESTAMP_NTZ AS property_value_timestamp_ntz
FROM
(
VALUES (GETDATE()), (DATEADD(HOUR,2,getdate())), (DATEADD(DAY,-3,GETDATE())) AS t (example_ts)
)
)
FILE_FORMAT = (TYPE = PARQUET)
OVERWRITE = TRUE
HEADER = TRUE
;
Copy the file locally and inspect the schema using parquet-tools schema
:
$ parquet-tools schema part_0_0_0.snappy.parquet
message schema {
optional int32 PROPERTY_VALUE_DATE (DATE);
optional int64 PROPERTY_VALUE_TIMESTAMP (TIMESTAMP_MILLIS);
optional int64 PROPERTY_VALUE_TIMESTAMP_NTZ (TIMESTAMP_MILLIS);
}
Also, create a "Hadoop HDFS dataset" in Dataiku that points to this file. Note that the schema gets loaded as int
for dates and bigint
for timestamps:
NB: we've tried picking various flavours of Parquet (Hive, Pig, and Spark) and manually setting the schema's data type to date
with no success.
Now, in Dataiku, create a dummy dataset with the same types. It can be Snowflake, or anything else. Example:
Add a Sync recipe with "Parquet" as the Format, and run it:
Copy the file locally and again run parquet-tools schema
(this output also appears in the Activity Log of the sync execution):
$ parquet-tools schema part-r-00000.snappy.parquet
message hive_schema {
optional int96 PROPERTY_VALUE_DATE;
optional int96 PROPERTY_VALUE_TIMESTAMP;
optional int96 PROPERTY_VALUE_TIMESTAMP_NTZ;
}
As you can see from the Parquet schema, Dataiku outputs using int96
for date values.
I have noticed some conversion problems with certain types when doing the bulk unload from snowflake to hdfs. Specifically:
NUMBER
(such as uprn, with 12 digits) convert to a DecimalType
in parquet, instead of an IntegerType
or LongType
. This can be fixed by a cast to bigint
in the SQL query:COPY INTO @<managed_stage> FROM (
SELECT "problematic_column"::bigint AS "problematic_column_fixed"
FROM <table>)
FILE_FORMAT = (TYPE = PARQUET)
OVERWRITE = TRUE
HEADER = TRUE;
If a Snowflake table has no Schema (e.g., because it's in PUBLIC
), the plugin fails with:
[2020/07/29-16:57:23.386] [null-err-102] [INFO] [dku.utils] - *************** Recipe code failed **************
[2020/07/29-16:57:23.387] [null-err-102] [INFO] [dku.utils] - Begin Python stack
[2020/07/29-16:57:23.387] [null-err-102] [INFO] [dku.utils] - Traceback (most recent call last):
[2020/07/29-16:57:23.387] [null-err-102] [INFO] [dku.utils] - File "/mnt/secondary/dataiku_data/jobs/KLABERSB2/Build_dss_sf_date_test_hdfs_2020-07-29T16-57-12.834/compute_dss_sf_date_test_hdfs_NP/custom-python-recipe/pyouthmC6vTuQNYAr/python-exec-wrapper.py", line 194, in <module>
[2020/07/29-16:57:23.388] [null-err-102] [INFO] [dku.utils] - exec(f.read())
[2020/07/29-16:57:23.388] [null-err-102] [INFO] [dku.utils] - File "<string>", line 16, in <module>
[2020/07/29-16:57:23.388] [null-err-102] [INFO] [dku.utils] - File "/mnt/secondary/dataiku_data/plugins/dev/snowflake-hdfs/python-lib/config.py", line 54, in get_table_name
[2020/07/29-16:57:23.389] [null-err-102] [INFO] [dku.utils] - return f'"{params["schema"]}"."{params["table"]}"'.replace('${projectKey}', project_key)
[2020/07/29-16:57:23.389] [null-err-102] [INFO] [dku.utils] - KeyError: 'schema'
This is because get_config()["params"]
does not return a schema
value:
{'connection': 'outra-sf-dataiku-db',
'notReadyIfEmpty': False,
'mode': 'table',
'partitioningType': 'custom',
'normalizeDoubles': True,
'table': 'DSS_SF_DATE_TEST',
'tableCreationMode': 'auto',
'writeInsertBatchSize': 10000,
'writeJDBCBadDataBehavior': 'DISCARD_ROW',
'readColsWithUnknownTzAsDates': False,
'readSQLDateColsAsDSSDates': True}
Not sure why I made the decision to restrict HDFS data sets to those compressed with Snappy. This should be changed (or documented).
make_dss_pip.sh
breaks because Dataiku has changed the URL for the "latest" download. It can be fixed by changing the curl
to follow redirects (-L
) and should also check that it can find a version variable. Looks like this:
VERSION=$(curl -L -v --silent https://www.dataiku.com/dss/trynow/linux/ 2>&1 | grep -e 'var cur_version' | sed -n 's/^.*"\(.*\)".*$/\1/p')
if [ -z "$VERSION" ]; then
echo "Can't find a DSS version"
exit
fi
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.