Giter Site home page Giter Site logo

spark-snowflake's Introduction

spark-snowflake

Snowflake Data Source for Apache Spark.

Build Status codecov License

The main version of spark-snowflake works with Spark 2.4. For use with Spark 2.3 and 2.2, please use tag vx.x.x-spark_2.3 and vx.x.x-spark_2.2.

To use it, provide the dependency for Spark in the form of net.snowflake:spark-snowflake_$SCALA_VERSION:$RELEASE, e.g. net.snowflake:spark-snowflake_2.11:2.2.2. See Maven Central for more info.

For a version working with Spark 1.5 and 1.6, please use branch-1.x. Artifacts of that version are also available in the Snowflake UI.

For a user manual and more information, see the official documentation.

For developer notes, see README-DEV

Acknowledgments

This project was originally forked from the spark-redshift project. We also occasionally port relevant patches from it. We would like to acknowledge and thank the developers of that project, in particular Josh Rosen.

spark-snowflake's People

Contributors

adparker avatar antonycourtney avatar binglihub avatar brkyvz avatar cfregly avatar devpog avatar dohongdayi avatar eduardoramirez avatar emlyn avatar etduwx avatar gregrahn avatar jaley avatar joshrosen avatar koeninger avatar linchan825 avatar marcinzukowski avatar marmbrus avatar mckim0928 avatar mengxr avatar mingli-rui avatar mjg232 avatar sameeraxiomine avatar sfc-gh-bli avatar sfc-gh-ema avatar sfc-gh-jfan avatar sfc-gh-mrui avatar sfc-gh-spandey avatar sfc-gh-sshankar avatar sfc-gh-zli avatar shashankgowdal avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

spark-snowflake's Issues

spark-snowflake writing a dataframe ignores option error_on_column_count_mismatch=false

Environment

  • python 2.7
  • spark 2.2.0
  • snowflake-jdbc-3.4.2.jar
  • spark-snowflake_2.11-2.2.8.jar
  • emr 5.11.1

Setup

create snowflake table that has an auto incrementing id column, using file format option error_on_column_count_mismatch=false

create table my_db.my_schema.test_table (
id integer autoincrement primary key,
user_id integer
)
stage_file_format = (TYPE=PARQUET, ERROR_ON_COLUMN_COUNT_MISMATCH=FALSE);

create a dataframe that only has user_id

from pyspark.sql import Row

data = [Row(user_id=x) for x in [99, 98, 97]]
df = spark.createDataFrame(data)
spark.write.format('jdbc')...

Issue

Writing a dataframe that has less columns than the destination table raises an net.snowflake.client.jdbc.SnowflakeSQLException and suggests use file format option error_on_column_count_mismatch=false to ignore this error. When following the suggestion, the same Exception occurs. This operation works on mysql and redshift, but not on snowflake.

Expeceted behavior

Assume the setup in ##setup, calling a spark jdbc write on a dataframe with column user_id should write the data into the destination table that has columns id, user_id, incrementing the autoincrement id column while populating the user_id column with data from dataframe. (Confirmed this works using snowflake-sqlalchemy, and snowflake SQL).

Assume a setup where you have columns user_id, name, zip_code on a snowflake table and a dataframe with columns user_id, name. Calling a spark jdbc write should populate user_id, name columns in the destination table, while leaving zip_code NULL.

Temporary workarounds

Although not ideal, we use the following workarounds for both issues so this may be useful for other readers/users.

For behavior 1, we are writing dataframe with column user_id into a temporary table that only has user_id and then performing a insert from select statement using sqlalchemy.

For behavior 2, we are generating the missing columns in the dataframe using F.withColumn('zip_code', None) and then successfully writing the dataframe into the destination table, filling NULL for columns that we were originally missing.

Savemode.Overwrite Causes a table swap without the temp table gaining initial table properties

Savemode.Overwrite in the background causes a swap on the table causing all clustering information and metadata information to be lost in the process.

The create statement should be

CREATE TABLE IF NOT EXISTS my_unique_table_staging_1054285534 LIKE my_unique_table

instead of

CREATE TABLE IF NOT EXISTS my_unique_table_staging_1054285534 (user_id INTEGER)

Exact Copy Grants and Clustering Keys should be moved as well

Unable to get data to snowflake with binary columns correctly

Hi,
I am trying to read and write data from/to snowflake using spark. I am unable to read data correctly, and this causes issue while writing data back to snowflake on binary columns. I am creating a dataset and writing it back to different table.
The dataset created has correct schema, dataset.show() fetched correct values but Row[] created using select has incorrect values in it. I have tried with HEX, BASE64 and UTF encoding in snowflake output format as per below linked doc.
eg -

dataset.show()
+-------------+
|            B|
+-------------+
|[48 45 4C 50]|
+-------------+
(Row[]) dataset.select("B").head(10);
- row - 1
0 - [[B@7fc9d2f5], StructType(StructField(B,BinaryType,true))
	- value - [72, 69, 76, 80]

// Steps to reproduce

  1. Referring to this doc,
ALTER User SET BINARY_OUTPUT_FORMAT = 'HEX';
ALTER User SET BINARY_INPUT_FORMAT = 'HEX';
create table db.sch.demo_binary_hex (b binary);

insert into db.sch.demo_binary_hex (b) select to_binary(hex_encode('HELP'), 'HEX');

select * from db.sch.demo_binary_hex;
select to_varchar(b), hex_decode_string(to_varchar(b)) from db.sch.demo_binary_hex;
  1. create dataset with spark in java
dataset = dataframeReader.format(sourceName).options(sfOptions)
					.option("query", queryAlias.toString()).load();
dataset.show();
+-------------+
|            B|
+-------------+
|[48 45 4C 50]|
+-------------+
// Row[]
(Row[]) dataset.select("B").head(10)
0 - [[B@2674b632], StructType(StructField(B,BinaryType,true))
          - [72, 69, 76, 80]
  1. pass this dataset to writer
DataFrameWriter<Row> dataframeWriter = dataset.write();
dataframeWriter.format(sourceName).options(sfOptions)
					.option("dbtable", notCreatedTable).mode(SaveMode.Append).save();
// Overwrite also dosent work
// Append creates table but exception when inserting data

exception -
SnowflakeUtil.checkErrorAndThrowExceptionSub(JsonNode, boolean) line: 152

String '[B@2674b632' is not a legal hex-encoded string
  File 'LzW5juDOmh/0.CSV.gz', line 1, character 1
  Row 1, column "DEMO_BINARY_HEX_1_STAGING_1359460749"["B":1]
  If you would like to continue loading when an error is encountered, use other values such as 'SKIP_FILE' or 'CONTINUE' for the ON_ERROR option. For more information on loading options, please run 'info loading_data' in a SQL client.

trace

net.snowflake.client.jdbc.SnowflakeSQLException: String '[B@2674b632' is not a legal hex-encoded string
  File 'LzW5juDOmh/0.CSV.gz', line 1, character 1
  Row 1, column "DEMO_BINARY_HEX_1_STAGING_1359460749"["B":1]
  If you would like to continue loading when an error is encountered, use other values such as 'SKIP_FILE' or 'CONTINUE' for the ON_ERROR option. For more information on loading options, please run 'info loading_data' in a SQL client.
	at net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowExceptionSub(SnowflakeUtil.java:152)
	at net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowException(SnowflakeUtil.java:77)
	at net.snowflake.client.core.StmtUtil.pollForOutput(StmtUtil.java:495)
	at net.snowflake.client.core.StmtUtil.execute(StmtUtil.java:372)
	at net.snowflake.client.core.SFStatement.executeHelper(SFStatement.java:575)
	at net.snowflake.client.core.SFStatement.executeQueryInternal(SFStatement.java:265)
	at net.snowflake.client.core.SFStatement.executeQuery(SFStatement.java:203)
	at net.snowflake.client.core.SFStatement.execute(SFStatement.java:874)
	at net.snowflake.client.jdbc.SnowflakeStatementV1.executeQueryInternal(SnowflakeStatementV1.java:259)
	at net.snowflake.client.jdbc.SnowflakePreparedStatementV1.executeQuery(SnowflakePreparedStatementV1.java:181)
	at net.snowflake.spark.snowflake.JDBCWrapper$$anonfun$executePreparedQueryInterruptibly$1.apply(SnowflakeJDBCWrapper.scala:317)
	at net.snowflake.spark.snowflake.JDBCWrapper$$anonfun$executePreparedQueryInterruptibly$1.apply(SnowflakeJDBCWrapper.scala:315)
	at net.snowflake.spark.snowflake.JDBCWrapper$$anonfun$1.apply(SnowflakeJDBCWrapper.scala:355)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

// Additional
I have tried with base64 input output, but it gives same exception, also utf8 although snowflake dosent let to set output format to utf8.
https://docs.snowflake.com/en/user-guide/binary-input-output.html
https://docs.snowflake.com/en/sql-reference/parameters.html#binary-input-format
I am using java8, spark 2.3.1, and net.snowflake.spark-snowflake_2.11 2.6.0-spark_2.3
This is on snowflake 4.11.2

Saving an empty dataframe throws an exception

Version details:
net.snowflake:spark-snowflake_2.11:2.5.1-spark_2.4
net.snowflake:snowflake-jdbc:3.8.5
Spark 2.4.3
Scala 2.11.12

It seems that trying to save an empty dataframe throws an exception in the spark snowflake connector code. Is this the intended behavior?

import org.apache.spark.sql.types.{StructType,StructField,StringType}
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode

val sfOptions = Map(
    "sfURL" -> url,
    "sfUser" -> user,
    "sfPassword" -> password,
    "sfDatabase" -> database,
    "sfSchema" -> schema,
    "sfWarehouse" -> warehouse,
    "keep_column_case" -> "on"
)

val schema_string = "id,name"

val schema_rdd = StructType(schema_string.split(",").map(fieldName => StructField(fieldName, StringType, true)))

val empty_df = spark.sqlContext.createDataFrame(sc.emptyRDD[Row], schema_rdd)

empty_df.write.format("net.snowflake.spark.snowflake").options(sfOptions).option("dbtable", "empty_table").mode(SaveMode.Overwrite).save

The error is:

java.util.NoSuchElementException: head of empty list
  at scala.collection.immutable.Nil$.head(List.scala:431)
  at scala.collection.immutable.Nil$.head(List.scala:428)
  at net.snowflake.spark.snowflake.io.StageWriter$.writeToStage(StageWriter.scala:59)
  at net.snowflake.spark.snowflake.io.package$.writeRDD(package.scala:53)
  at net.snowflake.spark.snowflake.SnowflakeWriter.save(SnowflakeWriter.scala:103)
  at net.snowflake.spark.snowflake.DefaultSource.createRelation(DefaultSource.scala:134)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
  ... 49 elided

Add Support for Ingesting Spark Array data types from a Snowflake Array data type

Currently, when a column data type in Snowflake is ARRAY (e.g. result of split), the resulting data type when ingested through Spark cannot be read using an Array data type. The sample code below demonstrates the limitation. The result of reading an ARRAY column is a java.lang.String value which cannot be converted to an Array.

Error

SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 25, 10.0.251.155, executor 2): java.lang.IllegalArgumentException: The value (["127","0","0","1"]) of the type (java.lang.String) cannot be converted to an array of string

Sample Code to Show Limitation

import org.apache.spark.sql.types._
import net.snowflake.spark.snowflake.Utils

Utils.runQuery(options, """CREATE SCHEMA IF NOT EXISTS databricks_demo_array_type""")
Utils.runQuery(options, """DROP TABLE IF EXISTS DD_ARRAY_TEST""")
Utils.runQuery(options, """create table dd_array_test as select split('127.0.0.1', '.') as num_array_x from adult limit 2""")

val customSchema = StructType(Array(
StructField("num_array_x", ArrayType(StringType, true), true)))

val dataset = spark.read.format("snowflake").options(options).schema(customSchema).option("dbtable", "dd_array_test").load()

Slow record downloading

Hi, I was testing out the connector (2.5.6-spark2.4_scala2.11) and found that it was substantially slower than reading equivalent parquet files. I did some profiling to find out that appending bytes to an ArrayBuffer inside SFRecordReader is really slow. I suggest to use a raw byte buffer (large enough to fit most use cases) offload it to a regular ArrayBuffer when full.. Doing that made the job run 2.5x faster.

Issue when using "name" column_mapping with column names that have periods

Version details:
net.snowflake:spark-snowflake_2.11:2.5.1-spark_2.4
net.snowflake:snowflake-jdbc:3.8.5
Spark 2.4.3
Scala 2.11.12

If you are using a combination of the column_mapping parameter set to name with a dataframe/table that has column names containing ., there is an exception thrown when writing the dataframe. This comes from the removeUselessColumns method in SnowflakeWriter.scala where it tries to select a new dataframe with only the necessary columns. The simple fix would be to surround each name fed to dataFrame.select(...) in removeUselessColumns with backticks, though I'm not sure what kind of edge cases will result from doing something like that.

import org.apache.spark.sql.SaveMode

val sfOptions = Map(
          "sfURL" -> url,
          "sfUser" -> user,
          "sfPassword" -> password,
          "sfDatabase" -> database,
          "sfSchema" -> schema,
          "sfWarehouse" -> warehouse,
          "keep_column_case" -> "on")

val df = Seq(
    (1),
    (2),
    (3)
).toDF("a.1")

// To create the table
df.write.format("net.snowflake.spark.snowflake").options(sfOptions).option("dbtable", "dot_column").mode(SaveMode.Append).save          

// Turn on column_mapping
val sfOptions = Map(
          "sfURL" -> url,
          "sfUser" -> user,
          "sfPassword" -> password,
          "sfDatabase" -> database,
          "sfSchema" -> schema,
          "sfWarehouse" -> warehouse,
          "column_mapping" -> "name",
          "keep_column_case" -> "on")

// To cause the problem
df.write.format("net.snowflake.spark.snowflake").options(sfOptions).option("dbtable", "dot_column").mode(SaveMode.Append).save

And the error:

java.lang.IllegalArgumentException: Incorrect column name when column mapping: org.apache.spark.sql.AnalysisException: cannot resolve '`a.1`' given input columns: [a.1];;
'Project ['a.1]
+- Project [value#1 AS a.1#3]
   +- LocalRelation [value#1]

  at net.snowflake.spark.snowflake.SnowflakeWriter.removeUselessColumns(SnowflakeWriter.scala:144)
  at net.snowflake.spark.snowflake.SnowflakeWriter.save(SnowflakeWriter.scala:101)
  at net.snowflake.spark.snowflake.DefaultSource.createRelation(DefaultSource.scala:134)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
  ... 49 elided

I also tried using "columnmap" -> Map("`a.1`"->"a.1").toString() but then you get a different error that field `a.1` does not exist.

Streaming API failing with Status code 405 found in response from service

Reading data from Kafka and writing to Snowflake using Snowflake streaming API. Below error is coming.
Data is staging in internal stage @streaming after that it's failing.

Can someone help me here, why 405 error is coming and how Could I proceed to debug the issue?

INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with subject
INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer
INFO net.snowflake.ingest.connection.SecurityManager - Created new JWT
INFO net.snowflake.ingest.connection.RequestBuilder - Creating a RequestBuilder with arguments
INFO net.snowflake.ingest.SimpleIngestManager - Sending Request UUID -
INFO net.snowflake.ingest.connection.RequestBuilder - Created Insert Request :
INFO net.snowflake.ingest.SimpleIngestManager - Attempting to unmarshall insert response - HttpResponseProxy{HTTP/1.1 405 Not Allowed [Content-Type: text/html, Date: Tue, 27 Aug 2019 02:56:49 GMT, Server: nginx, Strict-Transport-Security: max-age=31536000, X-Content-Type-Options: nosniff, X-Frame-Options: deny, Content-Length: 166, Connection: keep-alive] ResponseEntityProxy{[Content-Type: text/html,Content-Length: 166,Chunked: false]}}
WARN net.snowflake.ingest.connection.ServiceResponseHandler - Exceptional Status Code found in unmarshallInsert Response - 405
ERROR net.snowflake.ingest.connection.ServiceResponseHandler - Status code 405 found in response from service

Column names quoted with backticks (e.g. reserved words) are quoted incorrectly in `Utils.ensureQuoted`

The Utils.ensureQuoted and Utils.isQuoted functions do not respect identifiers that are already
quoted with backticks and instead wrap the backtick-quoted string in an additional set of double quotes. This presents an issue in particular when specifying reserved-word column
names in the columnmap option, as the generated SQL looks like:

COPY INTO table_name (..., "`values`", ...)

where values is the name of a column in the given table.

This causes an error:

SQL compilation error: error line 1 at position 102 invalid identifier '"`values`"'

Spark 3.0 Connector

I am getting an error when using Snowflake from Spark 3.0 Scala 2.1.

java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.plans.logical.LigicalPlan.argString()Ljava/lang/String;

java.lang.IllegalArgumentException: Don't know how to save StructField(SSN,BinaryType,true) of type SSN to Snowflake

This exception is thrown when reading from a SQL Server table, and saving to Snowflake, where the column named SSN is of data type NVARCHAR with collation set to "Latin1_General_BIN2".
Note that other tables, with columns of data type NVARCHAR, but with collation set to "S" persists without any issues.
This might be a Snowflake JDBC issue, and if so, please raise/reassign on the Snowflake JDBC repo.

Here is more exception details:

Support for variant type in snowflake table

The current implementation does not support variant types and errors when one of my data types in my dataframe is an array. would it be possible to support arraytypes to json?

Is this an open source project am I ok to fork/ contribute?

java.util.MissingResourceException: Can't find bundle for base name net.snowflake.client.jdbc.version, locale en_GB

Using the latest (2.5.2), and Snowflake JDBC 3.9, and getting the below both if I build from source or reference them on Maven central:

java.lang.RuntimeException: Can't load localized resource bundle due to java.util.MissingResourceException: Can't find bundle for base name net.snowflake.client.jdbc.version, locale en_GB and can't load default resource bundle due to java.util.MissingResourceException: C
an't find bundle for base name net.snowflake.client.jdbc.version, locale en_GB
  at net.snowflake.client.jdbc.internal.snowflake.common.core.ResourceBundleManager.reload(ResourceBundleManager.java:69)
  at net.snowflake.client.jdbc.internal.snowflake.common.core.ResourceBundleManager.<init>(ResourceBundleManager.java:37)
  at net.snowflake.client.jdbc.internal.snowflake.common.core.ResourceBundleManager.getSingleton(ResourceBundleManager.java:148)
  at net.snowflake.client.jdbc.SnowflakeDriver.<clinit>(SnowflakeDriver.java:42)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:348)
  at net.snowflake.spark.snowflake.Utils$.classForName(Utils.scala:59)
  at net.snowflake.spark.snowflake.JDBCWrapper.getConnector(SnowflakeJDBCWrapper.scala:109)
  at net.snowflake.spark.snowflake.SnowflakeRelation$$anonfun$schema$1.apply(SnowflakeRelation.scala:56)
  at net.snowflake.spark.snowflake.SnowflakeRelation$$anonfun$schema$1.apply(SnowflakeRelation.scala:53)
  at scala.Option.getOrElse(Option.scala:121)
  at net.snowflake.spark.snowflake.SnowflakeRelation.schema$lzycompute(SnowflakeRelation.scala:53)
  at net.snowflake.spark.snowflake.SnowflakeRelation.schema(SnowflakeRelation.scala:52)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:403)
  at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)

error: Symbol 'term net.snowflake.client' is missing from the classpath

Hi. When following the recommendation of importing the value for SNOWFLAKE_SOURCE_NAME:

import net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_NAME

the following error is thrown due to a dependency on a class not found:

error: Symbol 'term net.snowflake.client' is missing from the classpath. This symbol is required by 'value net.snowflake.spark.snowflake.Utils.s3Client'.

Removing the problematic import, and replacing it with:

val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

solves the issue. What package needs to be included additionally on the classpath to allow the import to succeed please?

Note that this is all in the spark-shell, with the latest JDBC driver, along with the latest and most appropriate connector matching the Spark and Scala versions.

Issue with Databricks Spark Streaming: Private key must be specified in Snowflake streaming

We are using databricks Spark to load data into snowflake. It is working perfectly with Batch jobs but failing with streaming. here is code:

 val options =Map(
"sfUrl" -> "********.snowflakecomputing.com",
"sfUser" -> "*****",
"sfPassword" -> "****",
"sfDatabase" -> "TEST_DB",
"sfSchema" -> "TEST_DOCUMENT",
"sfWarehouse" -> "COMPUTE_WH"
)

val rawStream = spark.readStream.schema(schema).json(path)
rawStream.writeStream.format("snowflake").options(options) .option("dbtable", "L_FEATURE_TEST").option("checkpointLocation", checkpointRaw).trigger(Trigger.Once()).start()

Error:

java.lang.IllegalArgumentException: requirement failed: Private key must be specified in Snowflake streaming
	at scala.Predef$.require(Predef.scala:224)
	at net.snowflake.spark.snowflake.SnowflakeSink.<init>(SnowflakeSink.scala:41)
	at net.snowflake.spark.snowflake.DefaultSource.createSink(DefaultSource.scala:137)
	at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:305)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:330)
	at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:1)
	at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:64)
	at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:66)
	at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:68)
	at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:70)
	at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:72)
	at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:74)
	at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:76)
	at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:78)
	at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw.<init>(command-2679090388403770:80)
	at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw.<init>(command-2679090388403770:82)
	at line92a282bca6f44a208d621b415f7ee12490.$read$$iw.<init>(command-2679090388403770:84)
	at line92a282bca6f44a208d621b415f7ee12490.$read.<init>(command-2679090388403770:86)
	at line92a282bca6f44a208d621b415f7ee12490.$read$.<init>(command-2679090388403770:90)
	at line92a282bca6f44a208d621b415f7ee12490.$read$.<clinit>(command-2679090388403770)
	at line92a282bca6f44a208d621b415f7ee12490.$eval$.$print$lzycompute(<notebook>:7)

Not sure, This is issue. Is this possible to load streaming data using the username and password ?

Spark-snowflake 2.2.0 doesn't work with new JDBC 3.2.1

I observe Caused by: java.lang.ClassNotFoundException: net.snowflake.client.jdbc.SnowflakeDriver using spark-snowflake 2.2.0 after publishing snowflake-jdbc 3.2.1.
I can reproduce the issue via spark-shell:
Succeeded with jdbc 3.2.0

spark-shell --master local[2] --packages net.snowflake:snowflake-jdbc:3.2.0
Spark context available as 'sc' (master = local[2], app id = local-xxxxxxx).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.
scala> :quit

Failed log with jdbc 3.2.1

spark-shell --master local[2] --packages net.snowflake:snowflake-jdbc:3.2.1
[init] error: error while loading <root>, Error accessing /home/hadoop/.ivy2/jars/net.snowflake_snowflake-jdbc-3.2.1.jar

Failed to initialize compiler: object java.lang.Object in compiler mirror not found.
** Note that as of 2.8 scala does not assume use of the java classpath.
** For the old behavior pass -usejavacp to scala, or if using a Settings
** object programmatically, settings.usejavacp.value = true.

Failed to initialize compiler: object java.lang.Object in compiler mirror not found.
** Note that as of 2.8 scala does not assume use of the java classpath.
** For the old behavior pass -usejavacp to scala, or if using a Settings
** object programmatically, settings.usejavacp.value = true.
Exception in thread "main" java.lang.NullPointerException
        at scala.reflect.internal.SymbolTable.exitingPhase(SymbolTable.scala:256)
        at scala.tools.nsc.interpreter.IMain$Request.x$20$lzycompute(IMain.scala:896)
        at scala.tools.nsc.interpreter.IMain$Request.x$20(IMain.scala:895)
        at scala.tools.nsc.interpreter.IMain$Request.headerPreamble$lzycompute(IMain.scala:895)
        at scala.tools.nsc.interpreter.IMain$Request.headerPreamble(IMain.scala:895)
        at scala.tools.nsc.interpreter.IMain$Request$Wrapper.preamble(IMain.scala:918)
        at scala.tools.nsc.interpreter.IMain$CodeAssembler$$anonfun$apply$23.apply(IMain.scala:1337)
        at scala.tools.nsc.interpreter.IMain$CodeAssembler$$anonfun$apply$23.apply(IMain.scala:1336)
        at scala.tools.nsc.util.package$.stringFromWriter(package.scala:64)
        at scala.tools.nsc.interpreter.IMain$CodeAssembler$class.apply(IMain.scala:1336)
        at scala.tools.nsc.interpreter.IMain$Request$Wrapper.apply(IMain.scala:908)
        at scala.tools.nsc.interpreter.IMain$Request.compile$lzycompute(IMain.scala:1002)
        at scala.tools.nsc.interpreter.IMain$Request.compile(IMain.scala:997)
        at scala.tools.nsc.interpreter.IMain.compile(IMain.scala:579)
        at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:567)
        at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
        at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)
        at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)
        at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
        at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply$mcV$sp(SparkILoop.scala:38)
        at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37)
        at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37)
        at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:214)
        at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:37)
        at org.apache.spark.repl.SparkILoop.loadFiles(SparkILoop.scala:105)
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:920)
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
        at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
        at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
        at org.apache.spark.repl.Main$.doMain(Main.scala:69)
        at org.apache.spark.repl.Main$.main(Main.scala:52)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

java.lang.ClassCastException: net.snowflake.client.jdbc.internal.snowflake.common.core.S3FileEncryptionMaterial cannot be cast to net.snowflake.client.jdbc.internal.snowflake.common.core.RemoteStoreFileEncryptionMaterial

Getting the following exception when upgrading to 2.2.6. Let me know what details I can provide:

java.lang.ClassCastException: net.snowflake.client.jdbc.internal.snowflake.common.core.S3FileEncryptionMaterial cannot be cast to net.snowflake.client.jdbc.internal.snowflake.common.core.RemoteStoreFileEncryptionMaterial
	at net.snowflake.spark.snowflake.ConnectorSFStageManager.encMat$lzycompute(ConnectorSFStageManager.scala:203)
	at net.snowflake.spark.snowflake.ConnectorSFStageManager.encMat(ConnectorSFStageManager.scala:201)
	at net.snowflake.spark.snowflake.ConnectorSFStageManager.masterKey$lzycompute(ConnectorSFStageManager.scala:231)
	at net.snowflake.spark.snowflake.ConnectorSFStageManager.masterKey(ConnectorSFStageManager.scala:230)
	at net.snowflake.spark.snowflake.SnowflakeRDD.<init>(SnowflakeRDD.scala:60)
	at net.snowflake.spark.snowflake.SnowflakeRelation.getRDDFromS3(SnowflakeRelation.scala:193)
	at net.snowflake.spark.snowflake.SnowflakeRelation.buildScan(SnowflakeRelation.scala:153)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$11.apply(DataSourceStrategy.scala:336)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$11.apply(DataSourceStrategy.scala:336)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:384)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:383)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:464)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:379)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:332)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:62)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:62)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$sparkPlan$1.apply(QueryExecution.scala:99)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$sparkPlan$1.apply(QueryExecution.scala:99)
	at org.apache.spark.sql.execution.SQLExecution$.withFileAccessAudit(SQLExecution.scala:53)
	at org.apache.spark.sql.execution.QueryExecution.withFileAccessAudit(QueryExecution.scala:50)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:105)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:105)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2786)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2132)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2345)
	at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:81)
	at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:42)
	at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$getResultBuffer$1$$anonfun$apply$1.apply(ScalaDriverLocal.scala:263)
	at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$getResultBuffer$1$$anonfun$apply$1.apply(ScalaDriverLocal.scala:254)
	at scala.Option.map(Option.scala:146)
	at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$getResultBuffer$1.apply(ScalaDriverLocal.scala:254)
	at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$getResultBuffer$1.apply(ScalaDriverLocal.scala:228)
	at scala.Option.map(Option.scala:146)
	at com.databricks.backend.daemon.driver.ScalaDriverLocal.getResultBuffer(ScalaDriverLocal.scala:228)
	at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:209)
	at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$2.apply(DriverLocal.scala:230)
	at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$2.apply(DriverLocal.scala:211)
	at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:173)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:168)
	at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:39)
	at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:206)
	at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:39)
	at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:211)
	at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:589)
	at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:589)
	at scala.util.Try$.apply(Try.scala:192)
	at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:584)
	at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:488)
	at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:391)
	at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:348)
	at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:215)
	at java.lang.Thread.run(Thread.java:745)

Unparseable number exceptions

Hi,
Numerous people at my organization have been getting the following exception from their jobs:
Screen Shot 2018-11-20 at 9 25 11 AM

This appears to be due to fields that are null, but deemed non-nullable, being converted into empty strings:

val data = if (input == null && !field.nullable) "" else input

Which subsequently causes the exception to be thrown because an empty string can't be converted into a number:

case dt: DecimalType => parseDecimal(data, isIR)

The weird thing is that the column metadata shows that the field is non-nullable, while "show column in table" and "desc table" both show the field as nullable.

Results in Spark:

Screen Shot 2019-03-17 at 2 17 11 AM

Results in Snowflake:

Screen Shot 2019-03-17 at 2 13 08 AM

The results in Spark appear to be incorrect. I assume it's using the metadata from this query?:

def tableMetaDataFromStatement(statement: SnowflakeSQLStatement,
bindVariableEnabled: Boolean = true): ResultSetMetaData =
(ConstantString("select * from") + statement + "where 1 = 0")
.execute(bindVariableEnabled)(connection).getMetaData

I tried to write a PR to replace the apparent bad query with one of the good ones, but that would only allow us to get the columns of tables and not queries.

Running these queries from python give the same problems, so perhaps the REST endpoint is returning unexpected results.

We currently hack around this by setting all columns to nullable, but any help you can provide would be greatly appreciated.

JDBC driver not able to connect to Snowflake. Error code: 390100, Message: Incorrect username or password was specified

Hey there I am able to connect to snowflake using python JDBC driver but not able to do same with pyspark in jupyter notebook?Already confirmed correctness of my username and password.
Also used package option and jar option in .\bin\pyspark --packages .. but no success.
Is it some kind of version issue,any leads?
Environment details :-
windows 10
python 3.6.6(jupyter notebook)
spark 2.4.3
snowflake-jdbc 3.8.1
spark-snowflake_2.11-2.4.13-spark_2.4
snowflake server version :- 3.26.1
community.snowflake question
code :-

from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext
conf = (SparkConf().setAppName("spark_data_loader").setMaster("local"))
sc = SparkContext(conf=conf).getOrCreate()
sqlContext = SQLContext(sc)

sfOptions = {
  "sfURL" : "<account_name>.snowflakecomputing.com",
  "sfAccount" : "<account_name>",
  "sfRole" : "<user_role_name>",
  "sfUser" : "<user_name>",
  "sfPassword" : "<password>",
  "sfDatabase" : "<database>",
  "sfSchema" : "<schema>",
  "sfWarehouse" : "<warehouse>",
}
df = spark.read.format(SNOWFLAKE_SOURCE_NAME)
  .options(**sfOptions)
  .option("query",  'select *  from "<database>"."<schema>"."<table>"')
  .load()
df.show() ```

**Error stacktrace :-**

``` net.snowflake.client.jdbc.SnowflakeSQLException: JDBC driver not able to connect to Snowflake. Error code: 390100, Message: Incorrect username or password was specified..
  at net.snowflake.client.core.SessionUtil.newSession(SessionUtil.java:592)
  at net.snowflake.client.core.SessionUtil.openSession(SessionUtil.java:267)
  at net.snowflake.client.core.SFSession.open(SFSession.java:462)
  at net.snowflake.client.jdbc.SnowflakeConnectionV1.<init>(SnowflakeConnectionV1.java:131)
  at net.snowflake.client.jdbc.SnowflakeDriver.connect(SnowflakeDriver.java:148)
  at java.sql.DriverManager.getConnection(DriverManager.java:664)
  at java.sql.DriverManager.getConnection(DriverManager.java:208)
  at net.snowflake.spark.snowflake.JDBCWrapper.getConnector(SnowflakeJDBCWrapper.scala:180)
  at net.snowflake.spark.snowflake.SnowflakeRelation$$anonfun$schema$1.apply(SnowflakeRelation.scala:56)
  at net.snowflake.spark.snowflake.SnowflakeRelation$$anonfun$schema$1.apply(SnowflakeRelation.scala:53)
  at scala.Option.getOrElse(Option.scala:121)
  at net.snowflake.spark.snowflake.SnowflakeRelation.schema$lzycompute(SnowflakeRelation.scala:53)
  at net.snowflake.spark.snowflake.SnowflakeRelation.schema(SnowflakeRelation.scala:52)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:403)
  at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
  ... 39 elided ```
 

Staging error when saving to a quoted table name

Version details:
net.snowflake:spark-snowflake_2.11:2.5.0-spark_2.4
net.snowflake:snowflake-jdbc:3.8.4
Spark 2.4.3
Scala 2.11.12

When saving to a Snowflake table whose name is surrounded by double quotes (to preserve case sensitivity), there is an error because of the way the staging table's name is built. You should be able to reproduce the error like this, in spark-shell:

import org.apache.spark.sql.SaveMode

val sfOptions = Map(
    "sfURL" -> url,
    "sfUser" -> user,
    "sfPassword" -> password,
    "sfDatabase" -> database,
    "sfSchema" -> schema,
    "sfWarehouse" -> warehouse
)

val df = Seq(
    (1, "one"),
    (2, "two"),
    (3, "three")
).toDF("number", "word")

df.write.format("net.snowflake.spark.snowflake").options(sfOptions).option("dbtable", "\"quoted_name\"").mode(SaveMode.Overwrite).save()

The error is:

ERROR StageWriter$: Error occurred while loading files to Snowflake: net.snowflake.client.jdbc.SnowflakeSQLException: SQL compilation error:
syntax error line 1 at position 13 unexpected '_staging_544555026'.
net.snowflake.client.jdbc.SnowflakeSQLException: SQL compilation error:
syntax error line 1 at position 13 unexpected '_staging_544555026'.
  at net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowExceptionSub(SnowflakeUtil.java:139)
  at net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowException(SnowflakeUtil.java:64)
  at net.snowflake.client.core.StmtUtil.pollForOutput(StmtUtil.java:485)
  at net.snowflake.client.core.StmtUtil.execute(StmtUtil.java:362)
  at net.snowflake.client.core.SFStatement.executeHelper(SFStatement.java:502)
  at net.snowflake.client.core.SFStatement.executeQueryInternal(SFStatement.java:247)
  at net.snowflake.client.core.SFStatement.executeQuery(SFStatement.java:186)
  at net.snowflake.client.core.SFStatement.execute(SFStatement.java:789)
  at net.snowflake.client.jdbc.SnowflakeStatementV1.executeQueryInternal(SnowflakeStatementV1.java:245)
  at net.snowflake.client.jdbc.SnowflakePreparedStatementV1.executeQuery(SnowflakePreparedStatementV1.java:160)
  at net.snowflake.spark.snowflake.JDBCWrapper$$anonfun$executePreparedQueryInterruptibly$1.apply(SnowflakeJDBCWrapper.scala:248)
  at net.snowflake.spark.snowflake.JDBCWrapper$$anonfun$executePreparedQueryInterruptibly$1.apply(SnowflakeJDBCWrapper.scala:246)
  at net.snowflake.spark.snowflake.JDBCWrapper$$anonfun$1.apply(SnowflakeJDBCWrapper.scala:283)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

It seems to be the way the staging table name is built here: https://github.com/snowflakedb/spark-snowflake/blob/master/src/main/scala/net/snowflake/spark/snowflake/io/StageWriter.scala#L82. The quotes result in a table name like "quoted_name"_staging_544555026, which causes problems in the SQL statement.

This same example works correctly if usestagingtable is set to off in the options.

"IsIn" pushdown for empty sequence generates invalid SQL

Hello.

When using filter like col("x").isin(empty_set) where empty_set variable is an empty sequence, the invalid SQL will be generated, something similar to select * from table1 where x in ().

Snowflake will throw an exception trying to parse the statement. It seems that the correct approach should be just generating false in place of such subquery, like Spark does for its JDBC connector - https://github.com/apache/spark/pull/15977/files (also for consistency reasons).

Best regards,
Evgenii Ignatev.

Support snowflake table creation without explicit columns quotation

Currently, there is parameter keep_column_case which allows us to turn on/off uppercasing behavior. Unfortunately despite the value of this setting, column names are always quoted during table creation. We want to avoid quotations of the table's column, so it created as is named in DataFrame. As I can understand from the source code - currently there is no way to change this behavior.
To implement this additional parameter always_quote can be introduced and checked during table schema generation.
This change seems to be simple, I plan to fork and do this fix, but I wonder are there any considerations why it's done in that way with forced quotation?

java.lang.NumberFormatException: For input string: โ€œinfโ€

I've a Snowflake table that has a column with doubles. One of the values are inf and -inf.

val df = sqlContext
  .read
  .format(SNOWFLAKE_SOURCE_NAME)
  .options(sfOptions())
  .load()
df.count()

When I try to read (or count) this table in Spark, the job fails with the following error:

java.lang.NumberFormatException: For input string: "inf"
    at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043)
    at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)
    at java.lang.Double.parseDouble(Double.java:538)
    at scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:285)
    at scala.collection.immutable.StringOps.toDouble(StringOps.scala:29)
    at net.snowflake.spark.snowflake.Conversions$$anonfun$1.apply(Conversions.scala:156)
    at net.snowflake.spark.snowflake.Conversions$$anonfun$1.apply(Conversions.scala:144)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at net.snowflake.spark.snowflake.Conversions$.net$snowflake$spark$snowflake$Conversions$$convertRow(Conversions.scala:144)
    at net.snowflake.spark.snowflake.Conversions$$anonfun$createRowConverter$1.apply(Conversions.scala:132)
    at net.snowflake.spark.snowflake.Conversions$$anonfun$createRowConverter$1.apply(Conversions.scala:132)
    at net.snowflake.spark.snowflake.CSVConverter$$anonfun$convert$1.apply(CSVConverter.scala:73)
    at net.snowflake.spark.snowflake.CSVConverter$$anonfun$convert$1.apply(CSVConverter.scala:34)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.next(InMemoryRelation.scala:100)
    at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.next(InMemoryRelation.scala:90)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:298)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

When looking at where the error happens, it seems to be in the row conversion in toDouble Conversions.scala#L156

at net.snowflake.spark.snowflake.Conversions$$anonfun$1.apply(Conversions.scala:156)

data.toDouble will not work if the input is inf. In scala the value should be Infinity instead. (which comes from Double.PositiveInfinity.toString)

WIll a PR with following patch be a good one for you guys?

case DoubleType => {
  if(data.equals("inf"))
    Double.PositiveInfinity
  else if(data.equals("-inf"))
    Double.NegativeInfinity
  else
    data.toDouble
}

Otherwise what should be the workaround to avoid crashing in similar cases?

Spark 2.2.1?

I'm curious about any plans to publish an updated connector for Spark 2.2.1? We'd like use the newer Spark and stay current with what is available in AWS EMR and this connector is currently the main complicating factor.

In my experiments I've verified that this doesn't cleanly compile against Spark 2.2.x; Snowflake support claims that it already supports 2.2.x (which is not what is claimed in the docs: https://docs.snowflake.net/manuals/user-guide/spark-connector-install.html#comparison-of-supported-versions).

Error passing in the private key to snowflake-spark connector

I am looking to create an ETL process that reads queries from Snowflake. Most of the examples online show how to set up a connection using a regular string password, but the way my company has set up their password is via private key. Unfortunately, when I try to pass in the private key as a parameter, it returns an error (seen below). It is worth noting though that the python-snowflake connector works just fine using the same credentials.

I am using the following:
Python 3.7
Spark 2.3.0

    Traceback (most recent call last):
      File "/Users/rihun/PycharmProjects/snowflake_gcp_etl/loader.py", line 61, in <module>
        .option("query", query) \
      File "/usr/local/opt/apache-spark/libexec/python/pyspark/sql/readwriter.py", line 172, in load
        return self._df(self._jreader.load())
      File "/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
      File "/usr/local/opt/apache-spark/libexec/python/pyspark/sql/utils.py", line 79, in deco
        raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
    pyspark.sql.utils.IllegalArgumentException: 'Input PEM private key is invalid'

Code Example:

    import findspark
    findspark.init()
    
    import pyspark
    import os
    
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages net.snowflake:snowflake-jdbc:3.6.12,net.snowflake:spark-snowflake_2.11:2.4.8 pyspark-shell'
    
    from pyspark import SparkConf, SparkContext
    from pyspark.sql import SQLContext, SparkSession
    from pyspark.sql.types import *
    from snowflake_connector import get_keeper_token, get_snowflake_credentials
    
    spark = SparkSession.builder.master('local').appName('Snowflake Loader').config('spark.driver.memory', '5G').getOrCreate()
    
    spark.builder.config('spark.executor.memory', '16G')
    spark.builder.config('spark.executor.cores', '4')
    
    SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
    
    sf_creds = get_snowflake_credentials(keeper_token=get_keeper_token())
    
    sfOptions = {
        "sfURL": sf_creds['sfURL'],
        "sfAccount": sf_creds['sfAccount'],
        "sfUser": sf_creds['sfUser'],
        "pem_private_key": sf_creds['sfPrivateKey'],
        "sfDatabase": sf_creds['sfDatabase'],
        "sfSchema": sf_creds['sfSchema'],
        "sfWarehouse": sf_creds['sfWarehouse'],
    }
    
    df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
        .options(**sfOptions) \
        .option("query", query) \
        .load()
    
    df.count()

How I am getting the credentials

def get_snowflake_credentials(keeper_token: str,
                             keeper_url='<keeper_url>',
                             keeper_namespace='cloudDB',
                             keeper_secret_path='<path_to_key>',
                             sf_account='<sf_account>',
                             sf_svc_user='<user>',
                             sf_wh='<warehouse>',
                             sf_role='<role>',
                             sf_db='<db>',
                             sf_schema='<schema>'):
        # Connect to Keeper to collect secrets
        client = hvac.Client(
            url=keeper_url,
            namespace=keeper_namespace,
            token=keeper_token
        )
    
        # Secrets are stored within the key entitled 'data'
        keeper_secrets = client.read(keeper_secret_path)['data']
        passphrase = keeper_secrets['SNOWSQL_PRIVATE_KEY_PASSPHRASE']
        private_key = keeper_secrets['private_key']
    
        # PEM key must be byte encoded
        key = bytes(private_key, 'utf-8')
    
        p_key = serialization.load_pem_private_key(
            key
            , password=passphrase.encode()
            , backend=default_backend()
        )
    
        pkb = p_key.private_bytes(
            encoding=serialization.Encoding.DER
            , format=serialization.PrivateFormat.PKCS8
            , encryption_algorithm=serialization.NoEncryption())
    
        sf_client = snowflake.connector.connect(
            user=sf_svc_user
            , account=sf_account
            , warehouse=sf_wh
            , role=sf_role
            , database=sf_db
            , schema=sf_schema
            , private_key=pkb)
    
        return {
            "sfURL": "<url>",
            "sfAccount": sf_account,
            "sfUser": sf_svc_user,
            "sfPrivateKey": pkb,
            "sfDatabase": sf_db,
            "sfSchema": sf_schema,
            "sfWarehouse": sf_wh
        }

truncate_table, preactions and postactions are not executing.

I use
"net.snowflake" % "spark-snowflake_2.11" % "2.4.13-spark_2.2",

i tried to use truncate_table with mode Append ideally so that i can keep adding data keeping the schema intact. So i used
mode "Append" and truncatetable "on". - 0 records added.
mode "Append" - 0 records added.
mode "Overwrite" - 403 records added (successful but schema changed)
mode "Overwrite" and truncatetable "on" - 0 records added.
Since Overwrite works, i tried to write to a temp table and then use "INSERT INTO" to copy to actual table. I've tried this in two ways. One by Utils.runQuery() and using preactions and postactions. Both doesn't work. And i found Utils.runQuery() can only be used to create table. And i have no clue why preactions and postactions doesn't work.

Query: How dataframe is created out of Snowflake table?

Can someone please explain below use-case

  1. Read from Snowflake
    How Snowflake table is converted into dataframe? As I can see it is converted into csv files by using COPY command.

Query Observed while reading data from Snowflake with Snowflake -spark connector.

select * from (SELECT "PERSONID", "LASTNAME", "FIRSTNAME", "ADDRESS", "CITY" FROM "SALES"."PUBLIC"."PERSONS_SNW" WHERE ( "PERSONS_SNW"."PERSONID" < 3) ORDER BY "PERSONS_SNW"."PERSONID") where 1 = 0

create temporary stage if not exists identifier(spark_connector_unload_stage_AliiT0AIbv)
//why we use GET here, and where file:///tmp/dummy_location_spark_connector_tmp/ this exist , is //it in spark cluster or snowflake cluster

GET @spark_connector_unload_stage_AliiT0AIbv/ file:///tmp/dummy_location_spark_connector_tmp/

alter session set timezone = 'Asia/Kolkata',
timestamp_ntz_output_format = 'YYYY-MM-DD HH24:MI:SS.FF3',
timestamp_ltz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3',
timestamp_tz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3';

//below command will unload table data to csv file of max size 100mb
COPY INTO '@spark_connector_unload_stage_AliiT0AIbv/YGRowzjKNG/' FROM ( SELECT "LASTNAME", "FIRSTNAME", "PERSONID", "CITY", "ADDRESS" FROM (SELECT "PERSONID", "LASTNAME", "FIRSTNAME", "ADDRESS", "CITY" FROM "SALES"."PUBLIC"."PERSONS_SNW" WHERE ( "PERSONS_SNW"."PERSONID" < 3) ORDER BY "PERSONS_SNW"."PERSONID") )
FILE_FORMAT = (
TYPE=CSV
COMPRESSION='gzip'
FIELD_DELIMITER='|'
FIELD_OPTIONALLY_ENCLOSED_BY='"'
ESCAPE_UNENCLOSED_FIELD = none
NULL_IF= ()
)
MAX_FILE_SIZE = 10000000

spark-snowflake is not compatible with COPY INTO behavior change

Change detailed here:

https://support.snowflake.net/s/article/COPY-INTO-location-Command-Output-Changes-Pending?r=6&ui-force-components-controllers-recordGlobalValueProvider.RecordGvp.getRecord=1

Which, as far as I can tell, will break pretty much all recent/semi-recent versions of spark-snowflake since they all have some version of this:

assert(sch.getColumnName(1) == "rows_unloaded")

df.show() resulting in error - java.lang.NoClassDefFoundError: net/snowflake/client/jdbc/telemetry/TelemetryClient

It would appear as though I'm able to successfully connect to Snowflake using pyspark, but as soon as I use df.show(), I get the following error:

py4j.protocol.Py4JJavaError: An error occurred while calling o51.showString.
: java.lang.NoClassDefFoundError: net/snowflake/client/jdbc/telemetry/TelemetryClient
at net.snowflake.spark.snowflake.DefaultJDBCWrapper$DataBaseOperations.getTelemetry(SnowflakeJDBCWrapper.scala:467)
at net.snowflake.spark.snowflake.io.StageReader$.sendEgressUsage(StageReader.scala:124)
at net.snowflake.spark.snowflake.io.StageReader$.readFromStage(StageReader.scala:57)
at net.snowflake.spark.snowflake.io.package$.readRDD(package.scala:39)
at net.snowflake.spark.snowflake.SnowflakeRelation.getSnowflakeRDD(SnowflakeRelation.scala:169)
at net.snowflake.spark.snowflake.SnowflakeRelation.getRDD(SnowflakeRelation.scala:156)
at net.snowflake.spark.snowflake.SnowflakeRelation.buildScan(SnowflakeRelation.scala:146)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$10.apply(DataSourceStrategy.scala:293)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$10.apply(DataSourceStrategy.scala:293)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:338)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:337)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy.pruneFilterProjectRaw(DataSourceStrategy.scala:415)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy.pruneFilterProject(DataSourceStrategy.scala:333)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:289)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3254)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2489)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2703)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: net.snowflake.client.jdbc.telemetry.TelemetryClient
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 52 more

My code example:

from pyspark.sql import SQLContext, SparkSession
import logging
from logging import getLogger
import queries
from snowflake_connector import ConnectToSnowflake

v_log = 'logs/spark.log'

sfOptions = ConnectToSnowflake(creds_path='creds.json').get_spark_sf_creds()

spark = SparkSession \
    .builder \
    .config("spark.jars", "jars/snowflake-jdbc-3.8.0.jar,jars/spark-snowflake_2.11-2.5.9-spark_2.4.jar,"
                          "jars/gcs-connector-hadoop3-2.0.1.jar") \
    .config("spark.repl.local.jars",
            "jars/snowflake-jdbc-3.8.0.jar,jars/spark-snowflake_2.11-2.5.9-spark_2.4.jar, "
            "jars/gcs-connector-hadoop3-2.0.1.jar") \
    .config("spark.sql.catalogImplementation", "in-memory") \
    .getOrCreate()

spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(
    spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())

logging.basicConfig(
    filename=v_log,
    level=logging.DEBUG)
logger = getLogger(__name__)

snowflake_source_name = 'net.snowflake.spark.snowflake'

df = spark.read.format(snowflake_source_name) \
    .options(**sfOptions) \
    .option("query", queries.getCustomerBaseQuery) \
    .load()

df.show()

Spark Snowflake Connector Scala version compatibility

Scala version 2.12.7
Spark 2.4.2
Spark-Snowflake 2.4.14

Our application uses the combination of the above three and we use spark-snowflake connector to read data in from Snowflake. Here is the exceptions tag we are having:

Exception in thread "main" java.lang.NoSuchMethodError: scala.Product.$init$(Lscala/Product;)V
at net.snowflake.spark.snowflake.Parameters$MergedParameters.(Parameters.scala:208)
at net.snowflake.spark.snowflake.Parameters$.mergeParameters(Parameters.scala:202)
at net.snowflake.spark.snowflake.DefaultSource.createRelation(DefaultSource.scala:59)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)

Documentation incomplete: "usestagingtable" and "truncateTable" relationshiop not mentioned

Hello.

Recently we stumbled on the thing, that it is not enough to simply set usestagingtable=off to disable stage creation when we write data to Snowflake.

In reality there is a need to also set truncateTable=on in this case, e.g.:

df.write() \
      .format(SNOWFLAKE_SOURCE_NAME) \
      .options(**sf_options) \
      .option("dbtable", table) \
      .option("usestagingtable", "off") \
      .mode(SaveMode.Overwrite) \
      .save()

See StageWriter relevant code line:

if (params.useStagingTable || !params.truncateTable) {

Am I missing something? If not, I think the documentation should be adjusted to mention this relationship.

Expired S3 Tokens

We're trying to run some spark code on an AWS EMR instance and we get the following error that usually occurs around the ~45 minute mark. We've messed around with a handful of config options, but we haven't been able to nail anything down.

The configuration we're having is:

[{"classification":"spark-env", "properties":{}, "configurations":[{"classification":"export", "properties":{"PYSPARK_PYTHON":"python34"}, "configurations":[]}]},{"classification":"spark-defaults", "properties":{"spark.executor.memory":"10G", "spark.yarn.executor.memoryOverhead":"10G"}, "configurations":[]}]

The step we're running is:

spark-submit --deploy-mode cluster --packages com.amazonaws:aws-java-sdk-s3:1.11.269,net.snowflake:spark-snowflake_2.11:2.2.8,org.apache.hadoop:hadoop-aws:2.7.2 --py-files s3://helper_functions.zip s3://py_file.py

The error code we're getting on each individual node is is:

18/01/27 21:52:26 WARN TaskSetManager: Lost task 0.0 in stage 8.0 (TID 627, ip-10-0-16-5.ec2.internal, executor 8): com.amazonaws.services.s3.model.AmazonS3Exception: The provided token has expired. (Service: Amazon S3; Status Code: 400; Error Code: ExpiredToken; Request ID:request_id; S3 Extended Request ID: request_id), S3 Extended Request ID: request_id
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1638)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1303)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1055)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4247)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4194)
at com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1398)
at com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1259)
at net.snowflake.spark.snowflake.SnowflakeRDD$$anonfun$compute$1.apply(SnowflakeRDD.scala:115)
at net.snowflake.spark.snowflake.SnowflakeRDD$$anonfun$compute$1.apply(SnowflakeRDD.scala:90)
at scala.collection.immutable.List.foreach(List.scala:381)
at net.snowflake.spark.snowflake.SnowflakeRDD.compute(SnowflakeRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Support Writing Spark Vector Columns into Snowflake Array Columns

Often in spark ML pipelines, we work with vector columns (Sparse and Dense vectors) through the dataframe API. One operation we recently tried is writing these columns into a snowflake table where the vector column maps to an array column in the table. This currently isn't supported and results in errors.

Is it possible to add this functionality to the spark-snowflake driver?

The workaround we are using currently is to convert the vector into an array type column through a pyspark UDF, but this is a pretty expensive, non-ideal solution given our dataset size. We can see a lot of value in having this functionality natively supported through the spark-snowflake driver.

Connection reset exceptions while reading data

Running spark on kubernetes, spark-snowflake 2.4.11-spark_2.4.

Seeing frequent "connection reset" stack traces when reading queried data. The query (identified by tag) shows successful in the snowflake query history, so this appears to be happening after the query/unload, when the client downloads the result.

This spark job does several identical queries with differing parameters. Most succeed but some die like this. Sometimes a retry works, sometimes it fails a few times in a row. Any ideas? This seems to be the root cause of these failures, but it's not clear to me why it's happening - whether it's a spark-snowflake, snowflake, or aws s3 issue.

2019-01-01 08:16:44 WARN  TaskSetManager:66 - Lost task 155.3 in stage 6340.0 (TID 454070, 100.121.128.10, executor 29): java.net.SocketException: Connection reset
	at java.net.SocketInputStream.read(SocketInputStream.java:210)
	at java.net.SocketInputStream.read(SocketInputStream.java:141)
	at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
	at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593)
	at sun.security.ssl.InputRecord.read(InputRecord.java:532)
	at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983)
	at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:940)
	at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
	at net.snowflake.client.jdbc.internal.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)
	at net.snowflake.client.jdbc.internal.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153)
	at net.snowflake.client.jdbc.internal.apache.http.impl.io.SessionInputBufferImpl.read(SessionInputBufferImpl.java:206)
	at net.snowflake.client.jdbc.internal.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:176)
	at net.snowflake.client.jdbc.internal.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:135)
	at net.snowflake.client.jdbc.internal.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82)
	at net.snowflake.client.jdbc.internal.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
	at net.snowflake.client.jdbc.internal.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82)
	at net.snowflake.client.jdbc.internal.amazonaws.services.s3.internal.S3AbortableInputStream.read(S3AbortableInputStream.java:125)
	at net.snowflake.client.jdbc.internal.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82)
	at net.snowflake.client.jdbc.internal.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82)
	at net.snowflake.client.jdbc.internal.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82)
	at net.snowflake.client.jdbc.internal.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
	at java.security.DigestInputStream.read(DigestInputStream.java:161)
	at net.snowflake.client.jdbc.internal.amazonaws.services.s3.internal.DigestValidationInputStream.read(DigestValidationInputStream.java:59)
	at net.snowflake.client.jdbc.internal.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82)
	at java.io.FilterInputStream.read(FilterInputStream.java:107)
	at javax.crypto.CipherInputStream.getMoreData(CipherInputStream.java:121)
	at javax.crypto.CipherInputStream.read(CipherInputStream.java:246)
	at java.util.zip.InflaterInputStream.fill(InflaterInputStream.java:238)
	at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:158)
	at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:117)
	at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:122)
	at net.snowflake.spark.snowflake.io.SFRecordReader.readChar(SFRecordReader.scala:167)
	at net.snowflake.spark.snowflake.io.SFRecordReader.next(SFRecordReader.scala:124)
	at net.snowflake.spark.snowflake.io.SFRecordReader.next(SFRecordReader.scala:32)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
2019-01-01 08:16:44 ERROR TaskSetManager:70 - Task 155 in stage 6340.0 failed 4 times; aborting job
2019-01-01 08:16:44 INFO  TaskSchedulerImpl:54 - Removed TaskSet 6340.0, whose tasks have all completed, from pool
2019-01-01 08:16:44 INFO  TaskSchedulerImpl:54 - Cancelling stage 6340
2019-01-01 08:16:44 INFO  TaskSchedulerImpl:54 - Killing all running tasks in stage 6340: Stage cancelled
2019-01-01 08:16:44 INFO  DAGScheduler:54 - ShuffleMapStage 6340 (map at sorted_event_arrays.scala:17) failed in 1982.776 s due to Job aborted due to stage failure: Task 155 in stage 6340.0 failed 4 times, most recent failure: Lost task 155.3 in stage 6340.0 (TID 454070, 100.121.128.10, executor 29): java.net.SocketException: Connection reset
	at java.net.SocketInputStream.read(SocketInputStream.java:210)
..

table creation fails if column name is a keyword

calling dataframe.write() to a table that does not exist yet results in:

CREATE TABLE IF NOT EXISTS test_schema.test_table (id INTEGER NOT NULL, created_at TIMESTAMP, updated_at TIMESTAMP, name STRING NOT NULL, order INTEGER)

which fails with:

SQL compilation error: syntax error line 1 at position 138 unexpected ''order''.

whereas:

CREATE TABLE IF NOT EXISTS test_schema.test_table (id INTEGER NOT NULL, created_at TIMESTAMP, updated_at TIMESTAMP, name STRING NOT NULL, "order" INTEGER)

succeeds.

would it make sense to always double-quote the column names during that SQL statement construction?

net.snowflake.client.jdbc.SnowflakeSQLException: No active warehouse selected in the current session. Select an active warehouse with the 'use warehouse' command.

USING:
"net.snowflake" %% "spark-snowflake" % "2.3.1-spark_2.1"

val sfOptions = Map(
  "sfURL" -> SNOWFLAKE_URL,
  "sfAccount" -> SNOWFLAKE_ACCOUNT,
  "sfUser" -> SNOWFLAKE_USER,
  "sfPassword" -> SNOWFLAKE_PASSWORD,
  "sfDatabase" -> sfDatabase,
  "sfRole" -> SNOWFLAKE_ROLE,
  "sfSchema" -> sfSchema,
  "sfWarehouse" -> SNOWFLAKE_WAREHOUSE,
  "dbtable" -> sfTable
)

df.write
  .format("net.snowflake.spark.snowflake")
  .options(sfOptions)
  .mode(SaveMode.Append)
  .save()

THROWS:

ERROR SnowflakeWriter: Error occurred while loading files to Snowflake: net.snowflake.client.jdbc.SnowflakeSQLException: No active warehouse selected in the current session. Select an active warehouse with the 'use warehouse' command.

Exception in thread "main" net.snowflake.client.jdbc.SnowflakeSQLException: No active warehouse selected in the current session. Select an active warehouse with the 'use warehouse' command.

at net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowException(SnowflakeUtil.java:99)
at net.snowflake.client.core.StmtUtil.execute(StmtUtil.java:410)
at net.snowflake.client.core.SFStatement.executeHelper(SFStatement.java:373)
at net.snowflake.client.core.SFStatement.executeQueryInternal(SFStatement.java:197)
at net.snowflake.client.core.SFStatement.executeQuery(SFStatement.java:149)
at net.snowflake.client.core.SFStatement.execute(SFStatement.java:531)
at net.snowflake.client.jdbc.SnowflakeStatementV1.executeInternal(SnowflakeStatementV1.java:204)
at net.snowflake.client.jdbc.SnowflakeStatementV1.execute(SnowflakeStatementV1.java:239)
at net.snowflake.spark.snowflake.JDBCWrapper$$anonfun$executeInterruptibly$1.apply(SnowflakeJDBCWrapper.scala:261)
at net.snowflake.spark.snowflake.JDBCWrapper$$anonfun$executeInterruptibly$1.apply(SnowflakeJDBCWrapper.scala:261)
at net.snowflake.spark.snowflake.JDBCWrapper$$anonfun$3.apply(SnowflakeJDBCWrapper.scala:283)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Writing on empty df is throwing exception java.util.NoSuchElementException: head of empty list

Repro:
spark.read is returning empty df invoke write on the same data frame is throwing an exception

scala> val df = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(sfOptions).option("query", "select * from PERSONS_READ where
personid=1200").load()
df: org.apache.spark.sql.DataFrame = [NAME: string, PERSONID: decimal(38,0)]

scala> df.show
+----+--------+
|NAME|PERSONID|
+----+--------+
+----+--------+

scala> df.write.format(SNOWFLAKE_SOURCE_NAME).options(sfOptions).option("dbtable","PERSONS_TGT").mode(SaveMode.Append).save()
;
java.util.NoSuchElementException: head of empty list
at scala.collection.immutable.Nil$.head(List.scala:420)
at scala.collection.immutable.Nil$.head(List.scala:417)
at net.snowflake.spark.snowflake.io.StageWriter$.writeToStage(StageWriter.scala:59)
at net.snowflake.spark.snowflake.io.package$.writeRDD(package.scala:53)
at net.snowflake.spark.snowflake.SnowflakeWriter.save(SnowflakeWriter.scala:103)
at net.snowflake.spark.snowflake.DefaultSource.createRelation(DefaultSource.scala:134)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
... 49 elided

Cross Database Joins

Currently, cross database joins are not pushed down due to the following snippet in SnowflakeQuery

/** Triplet that defines the Snowflake cluster that houses this base relation.
* Currently an exact match on cluster is needed for a join, but we may not need
* to be this strict.
*/
val cluster = (relation.params.sfURL,
relation.params.sfWarehouse,
relation.params.sfDatabase)

My team and I have a use case where we require cross database joins to be pushed down so I removed the sfDatabase reference and tested with a local build. It seems to work so far but are there any downsides to expect?

It would be good to know the exact reason for the restriction in the first place.

SparkWriter stages previously uploaded data

I have a Spark Cluster that streams events from Kafka and saves them into a Snowflake table.
I process Kafka messages in batches every 30 seconds.
The batch processor is written in Python, however, it uses spark writer to save the updates to the table.
I am using this version of the Spark connector
net.snowflake:spark-snowflake_2.11:2.7.0-spark_2.4,net.snowflake:snowflake-jdbc:3.12.3
Code:

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
sfOptions = {
    "sfURL": f"{SNOWFLAKE_DATA_CREDENTIAL['host']}.snowflakecomputing.com",
    "sfUser": SNOWFLAKE_DATA_CREDENTIAL['user'],
    "sfPassword": SNOWFLAKE_DATA_CREDENTIAL['password'],
    "sfDatabase": SNOWFLAKE_DATA_CREDENTIAL['database'],
    "sfSchema": SNOWFLAKE_DATA_CREDENTIAL['schema'],
    "sfWarehouse": SNOWFLAKE_DATA_CREDENTIAL['warehouse'],
    "dbtable": SNOWFLAKE_DATA_CREDENTIAL['table'],
    "sfRole": SNOWFLAKE_DATA_CREDENTIAL['role']
}
sf_df.write.format(SNOWFLAKE_SOURCE_NAME).mode('append')\
    .options(**sfOptions) \
    .save()

As I am watching the history of the queries in Snowflake, I see the data size constantly growing, and the row count increasing for each staged "COPY INTO" event.

command | size | driver | row count 
copy into TABLE FROM @spark_connector_load_stage_GMX92NImfX/...| 8.0MB | JDBC 3.12.3 | 117.4K
copy into TABLE FROM @spark_connector_load_stage_y7ZtmK7L2B... |8.0MB | JDBC 3.12.3 | 117.5K

I am confused as to why this is happening. I know for a fact that the data I am writing is tiny in the dev environment, as you can see only about hundred rows was added to the staged table.

Yes, only the updates are written to the Snowflake table, but this is probably going to blow up, when I move my code to production, and the data will quickly accumulate into gigabytes.

Note: setting the option USE_CACHED_RESULT to False does nothing.

PySpark: df.write.mode('overwrite') not respected

Current Behavior

df = spark.read.format(sfSource).options(**sfOptions).option('query',  query).load()

df.write.mode('overwrite').format(sfSource).options(**sfOptions).option("dbtable", table).option("parallelism", "15").save()

Results in an error:
Error occurred while loading files to Snowflake: net.snowflake.client.jdbc.SnowflakeSQLException: SQL compilation error: Object '<TABLE_NAME>' already exists.

Expected Behavior

The table is loaded into a spark dataframe and then written to a table in snowflake overwriting any existing data/table with the given table name.

Steps to Reproduce

Run the code snipped in the Current Behavior section

Context (Environment)

Name Value
Java Version 1.8.0_222 (Private Build)
Java Home /usr/lib/jvm/java-8-openjdk-amd64/jre
Scala Version version 2.11.8

using: net.snowflake:snowflake-jdbc:3.6.12 and net.snowflake:spark-snowflake_2.11:2.4.5

Extra Details

Failed Snowflake Queries (in order):
desc table identifier(<TABLE_NAME>)
alter table identifier(<TABLE_NAME>_staging_70934098) rename to identifier(<TABLE_NAME>)
drop table identifier(<TABLE_NAME>_staging_70934098)

Full stack trace:

Traceback (most recent call last):
  File "/home/ubuntu/data-science-modeling/recommendation_engine/etl_min/etl.py", line 79, in <module>
    main()
  File "/home/ubuntu/data-science-modeling/recommendation_engine/etl_min/etl.py", line 72, in main
    write_to_snowflake(df, "ETL_TEST_1", sfSource, **sfOptions)
  File "/home/ubuntu/data-science-modeling/recommendation_engine/etl_min/etl.py", line 41, in write_to_snowflake
    ).option("parallelism", "8").save()
  File "/home/ubuntu/spark-2.3.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 703, in save
  File "/home/ubuntu/spark-2.3.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/home/ubuntu/spark-2.3.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/home/ubuntu/spark-2.3.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o56.save.
: net.snowflake.client.jdbc.SnowflakeSQLException: SQL compilation error:
Object 'ETL_TEST_1' already exists.
	at net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowExceptionSub(SnowflakeUtil.java:135)
	at net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowException(SnowflakeUtil.java:60)
	at net.snowflake.client.core.StmtUtil.pollForOutput(StmtUtil.java:471)
	at net.snowflake.client.core.StmtUtil.execute(StmtUtil.java:358)
	at net.snowflake.client.core.SFStatement.executeHelper(SFStatement.java:473)
	at net.snowflake.client.core.SFStatement.executeQueryInternal(SFStatement.java:230)
	at net.snowflake.client.core.SFStatement.executeQuery(SFStatement.java:172)
	at net.snowflake.client.core.SFStatement.execute(SFStatement.java:661)
	at net.snowflake.client.jdbc.SnowflakeStatementV1.executeQueryInternal(SnowflakeStatementV1.java:161)
	at net.snowflake.client.jdbc.SnowflakePreparedStatementV1.executeQuery(SnowflakePreparedStatementV1.java:138)
	at net.snowflake.spark.snowflake.JDBCWrapper$$anonfun$executePreparedQueryInterruptibly$1.apply(SnowflakeJDBCWrapper.scala:251)
	at net.snowflake.spark.snowflake.JDBCWrapper$$anonfun$executePreparedQueryInterruptibly$1.apply(SnowflakeJDBCWrapper.scala:249)
	at net.snowflake.spark.snowflake.JDBCWrapper$$anonfun$2.apply(SnowflakeJDBCWrapper.scala:286)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Processing aborted due to error 300010:1087106694; incident 7314215.

When using the Snowflake Spark Connector, when trying to read the INFORMATION_SCHEMA views, I get this error:level 1 SQL Error [603] [XX000]: SQL execution internal error:
Processing aborted due to error 300010:1087106694; incident 7314215.

The query is reading from INFORMATION_SCHEMA.TABLES view, and doing a COPY INTO a previously created internal stage. The query is generated by the Spark Connector, latest version.
Here is some output produced by the failing Spark code:
19/08/22 20:04:57 ERROR SnowflakeStrategy: Pushdown failed :SQL execution internal error:
Processing aborted due to error 300010:1087106694; incident 7938892.
19/08/22 20:05:30 ERROR SnowflakeStrategy: Pushdown failed :SQL execution internal error:
Processing aborted due to error 300010:1087106694; incident 2356683.
net.snowflake.client.jdbc.SnowflakeSQLException: SQL execution internal error:
Processing aborted due to error 300010:1087106694; incident 9371821.
at net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowExceptionSub(SnowflakeUtil.java:139)
at net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowException(SnowflakeUtil.java:64)
at net.snowflake.client.core.StmtUtil.pollForOutput(StmtUtil.java:485)
at net.snowflake.client.core.StmtUtil.execute(StmtUtil.java:362)
at net.snowflake.client.core.SFStatement.executeHelper(SFStatement.java:504)
at net.snowflake.client.core.SFStatement.executeQueryInternal(SFStatement.java:249)
at net.snowflake.client.core.SFStatement.executeQuery(SFStatement.java:187)
at net.snowflake.client.core.SFStatement.execute(SFStatement.java:793)
at net.snowflake.client.jdbc.SnowflakeStatementV1.executeQueryInternal(SnowflakeStatementV1.java:258)
at net.snowflake.client.jdbc.SnowflakePreparedStatementV1.executeQuery(SnowflakePreparedStatementV1.java:166)
at net.snowflake.spark.snowflake.JDBCWrapper$$anonfun$executePreparedQueryInterruptibly$1.apply(SnowflakeJDBCWrapper.scala:248)
at net.snowflake.spark.snowflake.JDBCWrapper$$anonfun$executePreparedQueryInterruptibly$1.apply(SnowflakeJDBCWrapper.scala:246)
at net.snowflake.spark.snowflake.JDBCWrapper$$anonfun$1.apply(SnowflakeJDBCWrapper.scala:283)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Note that I have already tried to deactivate query push down, with no effect.

Note also that I've tested on the latest version 1.5.2 with same results. The credentials have proper permission to create stages, the create stage succeeds, but the copy into fails.
Thanks in advance.

Snow flake source doesn't work with query

We are using version 2.4.5 to read data from snowflake. We are providing a query instead of table name. It gives this exception

net.snowflake.client.jdbc.SnowflakeSQLException: SQL compilation error:
syntax error line 1 at position 137 unexpected 'SELECT'.
syntax error line 1 at position 175 unexpected '0'.
syntax error line 1 at position 215 unexpected 'FROM'.
syntax error line 1 at position 267 unexpected 'FROM'.
syntax error line 1 at position 473 unexpected ')'.
Exception in thread "main" net.snowflake.client.jdbc.SnowflakeSQLException: SQL compilation error:
syntax error line 1 at position 137 unexpected 'SELECT'.
syntax error line 1 at position 175 unexpected '0'.
syntax error line 1 at position 215 unexpected 'FROM'.
syntax error line 1 at position 267 unexpected 'FROM'.
syntax error line 1 at position 473 unexpected ')'.
	at net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowExceptionSub(SnowflakeUtil.java:135)
	at net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowException(SnowflakeUtil.java:60)
	at net.snowflake.client.core.StmtUtil.pollForOutput(StmtUtil.java:471)
	at net.snowflake.client.core.StmtUtil.execute(StmtUtil.java:358)
	at net.snowflake.client.core.SFStatement.executeHelper(SFStatement.java:473)
	at net.snowflake.client.core.SFStatement.executeQueryInternal(SFStatement.java:230)
	at net.snowflake.client.core.SFStatement.executeQuery(SFStatement.java:172)
	at net.snowflake.client.core.SFStatement.describe(SFStatement.java:191)
	at net.snowflake.client.jdbc.SnowflakePreparedStatementV1.<init>(SnowflakePreparedStatementV1.java:114)
	at net.snowflake.client.jdbc.SnowflakeConnectionV1.prepareStatement(SnowflakeConnectionV1.java:621)
	at net.snowflake.spark.snowflake.SnowflakeSQLStatement.execute(SnowflakeJDBCWrapper.scala:591)
	at net.snowflake.spark.snowflake.io.DataUnloader$class.setup(DataUnloader.scala:57)
	at net.snowflake.spark.snowflake.io.SFInternalRDD.setup(SFInternalRDD.scala:46)
	at net.snowflake.spark.snowflake.io.SFInternalRDD$$anonfun$1.apply(SFInternalRDD.scala:67)
	at net.snowflake.spark.snowflake.io.SFInternalRDD$$anonfun$1.apply(SFInternalRDD.scala:66)
	at net.snowflake.spark.snowflake.io.SFInternalStage.executeWithConnection(SFInternalStage.scala:425)
	at net.snowflake.spark.snowflake.io.SFInternalRDD.<init>(SFInternalRDD.scala:66)
	at net.snowflake.spark.snowflake.io.package$.readRDD(package.scala:43)
	at net.snowflake.spark.snowflake.SnowflakeRelation.getRDD(SnowflakeRelation.scala:177)
	at net.snowflake.spark.snowflake.SnowflakeRelation.buildScan(SnowflakeRelation.scala:157)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$10.apply(DataSourceStrategy.scala:293)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$10.apply(DataSourceStrategy.scala:293)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:338)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:337)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy.pruneFilterProjectRaw(DataSourceStrategy.scala:415)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy.pruneFilterProject(DataSourceStrategy.scala:333)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:289)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$3.apply(QueryExecution.scala:206)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$3.apply(QueryExecution.scala:206)
	at org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:100)
	at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:206)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)

I debugged through the code and it seems that the problem is in this line https://github.com/snowflakedb/spark-snowflake/blob/master/src/main/scala/net/snowflake/spark/snowflake/SnowflakeRelation.scala#L203

private def standardStatement(
                                 requiredColumns: Array[String],
                                 filters: Array[Filter]
                               ): SnowflakeSQLStatement = {

    assert(!requiredColumns.isEmpty)
    // Always quote column names, and uppercase-cast them to make them equivalent to being unquoted
    // (unless already quoted):
    val columnList = requiredColumns
      .map(col => if (isQuoted(col)) col else "\"" + col.toUpperCase + "\"")
      .mkString(", ")
    val whereClause = FilterPushdown.buildWhereStatement(schema, filters)
    val tableNameOrSubquery: StatementElement =
      params.table.map(_.toStatement).getOrElse(ConstantString(params.query.get))
    ConstantString("SELECT") + columnList + "FROM" + tableNameOrSubquery + whereClause
  }
}

should be

private def standardStatement(
                                 requiredColumns: Array[String],
                                 filters: Array[Filter]
                               ): SnowflakeSQLStatement = {

    assert(!requiredColumns.isEmpty)
    // Always quote column names, and uppercase-cast them to make them equivalent to being unquoted
    // (unless already quoted):
    val columnList = requiredColumns
      .map(col => if (isQuoted(col)) col else "\"" + col.toUpperCase + "\"")
      .mkString(", ")
    val whereClause = FilterPushdown.buildWhereStatement(schema, filters)
    val tableNameOrSubquery: StatementElement =
      params.table.map(_.toStatement).getOrElse(ConstantString(params.query.map(ConstantString("(") + _ + ")")))
    ConstantString("SELECT") + columnList + "FROM" + tableNameOrSubquery + whereClause
  }
}

The query is not being wrapped in parentheses if requiredColumns is not empty

Timezone ignored when parsing timezone-aware datetimes

I have a question about the timestamp with timezone parsing in snowflake here:

val timestampString = "2014-03-01 00:00:01.123 -0800"

The expected timestamp is the same as the data without timezone info. It seems the parser is ignoring the timezone info.

I noticed this while recently fixing this issue in a different fork of spark-redshift here: https://github.com/spark-redshift-community/spark-redshift/pull/26/files
So now I am wondering if I am missing something and made a mistake in my repo since the code differs

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.