Giter Site home page Giter Site logo

vertica / spark-connector Goto Github PK

View Code? Open in Web Editor NEW
18.0 18.0 22.0 23.14 MB

This component acts as a bridge between Spark and Vertica, allowing the user to either retrieve data from Vertica for processing in Spark, or store processed data from Spark into Vertica.

License: Apache License 2.0

Scala 98.14% Dockerfile 0.53% Shell 1.33%

spark-connector's People

Contributors

ai-bq avatar alexey-temnikov avatar alexr-bq avatar aryex avatar ches avatar jbobson98 avatar jeremyprime avatar jonathanl-bq avatar nerdlogic avatar ravjotbrar avatar valentina-bq avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

spark-connector's Issues

Merge less cols without copy_column_list

Currently, the connector requires a copy column list if the number of cols in the df are less than the number of cols in the target table. This will be fixed by inferring the schema from the tempTable when building the merge statement.

Df schema: (col1, col2, col3)
Target table schema: (col1, col2, col3, col4)

Acceptance criteria: only the first three columns in the target table should be affected.

Publish non-uber jars and/or relocate vendor libraries

Hi,

It seems the connector is published as an assembly JAR, without relocating packages of the third-party libraries. That's generally a frowned-upon practice.

For me it manifests as a problem because the connector depends on scalactic in compile scope. Your version of Scalactic has a binary incompatibility with an older ScalaTest version that I'm currently limited to using in the test suite of an industrial project. I can't use build tool facilities to try excluding your Scalactic version, because it isn't a managed transitive dependency, vertica-spark brings it directly to the classpath in /org/scalactic. This is hard to debug, since the source of errors is not the Scalactic version that I see in my dependency tree from my ScalaTest version, it's a sneaky one.

The connector might be able to eliminate use of Scalactic as a runtime dep, but that isn't the root issue, this will likely arise with another vendored library for other users.

I understand there is a desire for shaded uber-jar packages in some cases. I believe good practice, and what I'd like to suggest, is:

  1. Publish an artifact with no shading, allowing users to manage transitive dependencies with the facilities of their build tools.
  2. Publish an artifact with shaded dependencies under a classifier such as shaded or assembly, as an expedient option for users that find binary compatibility problems in their builds.
  3. The shaded artifact must relocate all vendor packages to avoid classpath conflicts in user builds.

I feel the first should be the default artifact, with no classifier—it's best to promote modularity as the first option—but you might invert them if you prefer.

I can try taking a stab at the sbt setup for this, if maintainers agree with it.

Thanks for your work on the new connector!

Add option to create external table out of written connector data

This covers half of the requested external table functionality. The scope here is replacing the copy step of the connector with a create external table step.

Outline of feature:

  • New option: create_external_table - boolean defaulting to false
  • Normal table should not be created
  • Copy and Cleanup steps disabled
  • Create external table step added
  • Verification of external table step added

Acceptance criterea:

  • Unit tests
  • Integration tests -- include String length mismatch and Decimal precision mismatch

Spike: External Table Support

Explore functionality and prototype.

  1. Add option for creating an external table rather than loading data into Vertica

Compare schema used with INFER_EXTERNAL_TABLE_DDL
Test reading from external table afterwards

  1. Add option for skipping the write step, and only doing the external table based on existing parquet files

Test with partitioned parquet files

Acceptance criteria:

  • Document outlining solution
  • Prototype

Contribution to Spark: "done" state on read

The spark V2 datasource API read side does not have any mechanism for performing overall operation cleanup on the driver node once the operation is completed.

This proposes to contribute to spark to add this functionality. Should provide a function called upon operation completion, with context of whether the operation failed or succeeded.

Fix scripts for Pyspark and s3 examples

PySpark example doesn't include script to spin up spark cluster, as a result without the correct initial setup when running "run-s3-example.sh" script may fail with error.

This includes adding automation for the PySpark example to make it easier to run. Previously, we didn't install Python3 or setup the Spark cluster, so this makes it easier to get started.

It also includes comments to the Python script to make it easier for people unfamiliar with PySpark to understand what the script is doing.

Spike: complex types

Currently, the V2 connector does not support complex data types (ARRAY, MAP, ROW). Trying to convert Vertica's ARRAYs into Spark ArrayTypes is a bit complicated due to ArrayType being abstract.

  • Gather knowledge from Vertica team on what changed w/ complex types in 11.0 - specifically w/ regards to parquet export and copy
  • Prototype
  • Split into task(s)

Determine new best default values for row group and file size

There is a bug, now fixed, that may cause Vertica to reserve much more disk space than needed for an export. For maximal support of older Vertica versions, we should investigate how the row group and file size params effect this happening, and change the default values to reflect this.

Acceptance criteria:

  • Report on how these parameters affect this bug
  • Change of default parameters to reflect this
  • Change to performance guide reflecting how to modify these parameters for optimal performance

Set Parquet Export Page Size Dynamically

There is an undocumented option in the parquet export: pageSizeKB.
Lowering this lowers the total memory that is required to be reserved per thread in parquet export, allowing for more threads.
Equation per thread is:
(2 x RowGroupSize) + (4 x ColNum x PageSize)

  1. Experiment setting this lower -- see perf impact in non-memory-bound setting

  2. Assuming not a significant decrease -- or decrease deemed worth the tradeoff - set page size relative to row group size. Something like:
    pageSize = max(8kb, rowGroupSize/colCount)

Release 2.0.1

Acceptance Criteria:

  • external tables use case 1
  • YARN
  • sparklyr example
  • Merge statements
  • create changelog
  • run integration tests and manual tests on final build
  • create GitHub artifacts

Determine Optimal Partition Count

The working hypothesis: partition count should be at or near the number of cores in the spark cluster.

The goal of this issue is to thoroughly test this and get solid numbers on performance at different partition count vs cores available.

Test with databricks spark

Look into databricks spark and how it can work with our connector.

  • What licensing is needed?
  • What differences are there?
  • Run full test suite if possible

SSO: Kerberos Integration Tests in CI/CD (Github Actions)

  1. Move Docker code from custom repo to spark-connector repo.
  2. Fine Tune Docker Code, to ensure it is fully automated and doesn't require manual steps.
  3. Run the automation from the GitHub Action on PR event.
  4. Ensure we do not impact the current non Kerberos integration Test in Github Actions

Update examples and functional tests to use WebHDFS

WebHDFS will be supported as a default configuration. Examples and functional tests needs to be updates to WebHDFS. More information about WebHDFS can be found here.

Following changes will be performed

  1. Documentation will be updated to reflect WebHDFS is recommended to use.
  2. Example applications will be updated to use WebHDFS
  3. Integration tests will be updated to use WebHDFS
  4. It is expected small amount of tests will remain using HDFS

Application Flow Diagram

Acceptance Criteria:

Create a high level application flow diagram, with accompanying lower-level read and write flow diagrams. These diagrams should help contributors understand the connector and how the different components work together in the project.

Use a smart client_label on each connection to vertica

Each connection made to Vertica should set a client label that identifies its purpose. This helps identify and track down problems. It also has the added benefit of helping quantify customer usage if customers share their scrutinize data with Vertica.

Here are some thoughts on what that could look like. Some of these might not be possible, and we could add more later if needed.

vkspark-[-config details][--purpose]
where is some scrubbed string that identifies this spark cluster
where the optional [-config details] details might be:
-vs - the version of the vertica spark connector
-sp - the version of spark
-sc - the version of scala
-py - the version of python
-ps - the version of pyspark
-n - the spark node number?
-m - the number of spark nodes in cluster
-k - kerberos was used to connect
-p - password was used to connect
-c - increment for each connection made from this node

e.g.
vspark-MyCluster5-vs2.0.1-sp3.1.0-sc2.13.1-n3-m12-k-c1--main
vspark-MyCluster5-vs2.0.1-sp3.1.0-sc2.13.1-n3-m12-k-c2--background_monitoring

[Spike] S3 Kerberos Authentication

  1. Identify environment and effort to configure environment
  2. Reach out to Benjamin to get more details on how Kerberos with S3 used or anticipated to be used
  3. Build prototype for S3 Kerberos (might be a manual test / environment configuration)

Acceptance Criteria:

  1. Detailed scope and estimations for S3 Kerberos support

Remove unnecessary methods in TestUtils

It is currently wasteful to provide all the functionality in TestUtils with every example. Therefore, we can remove the unnecessary methods and possibly code the rest inline.

Prototype for Merge statements

updatedMerge

With the implementation of merge statements in our connector, users can process raw data in Spark and merge that data with an existing table in Vertica. The processed data will be written to a temporary table in Vertica before being merged. In the existing table, matched records will be updated and new records will be inserted, effectively constituting an “Upsert”. In order to execute a merge statement in our Spark Connector, the user needs to pass in a mergeKey, which will likely be an array of column attributes to join the existing table and temporary table on. If this option is not populated, a write will be performed as usual without the use of a temporary table.

Example use case: https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/AdministratorsGuide/Tables/MergeTables/MergeExample.htm

Cache dependencies in GitHub Actions workflows

Currently, for some of our GitHub Actions workflows (e.g. S3 integration test runs) we use wget to fetch dependencies such as Spark and Hadoop. This takes a long time and may not be reliable either if the URL changes or goes down. To solve this, we should be caching dependencies. This repo seems to detail a way to make that possible: https://github.com/actions/cache

Refactor examples' script code

There is common code between all the scripts for each example that should be extracted out to minimize redundancy. A possible solution is to move all of this code into the Dockerfile so that it is only run when the containers are composed up.

Set up Eon Accelerator to test our connector

  • As a user you will log into https://www.vertica.com/accelerator and use that interface to talk indirectly with your own secure cloud. The link between the customer cloud and Vertica cloud is established during onboarding using cross account IAM access. Currently no fee for duration of the early access program, as customer just needs to take care of their own AWS bill. Comes with 5 databases per account and primary + 3 sub-clusters per databases

What do I need to get started?

  • AWS account
  • AWS admin access to setup cross account IAM access

Merge Statements

image
With the implementation of merge statements in our connector, users can process raw data in Spark and merge that data with an existing table in Vertica. The processed data will be written to a temporary table in Vertica before being merged. In the existing table, matched records will be updated and new records will be inserted, effectively constituting an “Upsert”. In order to execute a merge statement in our Spark Connector, the user needs to pass in a mergeKey, which will likely be an array of column attributes to join the existing table and temporary table on. If this option is not populated, a write will be performed as usual without the use of a temporary table.

Example use case: https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/AdministratorsGuide/Tables/MergeTables/MergeExample.htm

Write Pipe Tests:

  • It should create a temporary table if merge key exists

  • It should copy data to a temporary table if merge key exists

Integration Tests:

  • It should merge with an existing table in Vertica

Additional Requirements:

  • Make mergeKey a List of mergeKeys

  • What happens if we’re in overwrite mode when we also want to perform a merge?
    • Possible solution: force append mode in the logic if merge key exists

  • Consider when schema of target table is different from dataframe (using copyColumnList)
  • True temporary table created as part of session
  • Performance testing comparing a merge to a normal write
  • Side note: if the merge key is not unique in the data frame, merge won’t work

Merge Statements Demo

Create a demo to show merge statements functionality:

  1. Regular merge with overwrite mode
  2. Using copy column list
  3. Multiple columns

Important issues to note:

  • Merge key cols must have the same names in target table and df
  • Merge key values must be unique in the df
  • The number of columns in df must be less than or equal to the number of columns in target table (as long as we specify which columns are affected in the target table using copy column list)

Schema mismatch: option for logging example data

It's been requested to log an example of data that doesn't match schema. If added this should be an option as logging real data by default is a security concern.

Will be good to check this issue together with #293

Change Spark download mirror site

Current mirror site in our scripts is not functional. This issue is evident when we try to run s3 integration tests using the script that downloads Spark and Hadoop. The same link is also used in the PySpark example. Will change to one in the Apache archive.

Audit Existing V1 open bugs

AC: Review existing V1 open bugs and create tickets for use cases that should be added to V2 coverage to ensure a) Bug is resolved b) Coverage is added for that use case if didn't exist prior.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.