mrpowers / spark-daria Goto Github PK
View Code? Open in Web Editor NEWEssential Spark extensions and helper methods ✨😲
License: MIT License
Essential Spark extensions and helper methods ✨😲
License: MIT License
spark-shell --packages MrPowers:spark-fast-tests:1.1.
Exception in thread "main" java.lang.RuntimeException: [unresolved dependency: MrPowers#spark-fast-tests;1.1.3: not found]
@snithish @gorros @kirubakarrs @oscarvarto - I’ve always thought it’s a bit random how Spark defines some functionality as Column functions and other functionality as SQL functions. Here’s an example of the inconsistency:
lower(col("blah").substr(0, 2))
Having two SQL functions would look like this:
lower(substr(col("blah"), 0, 2))
Having two Column functions would look like this:
col("blah").substr(0, 2).lower()
I like the Column functions syntax, so I started monkey patching the SQL functions to the Column class: https://github.com/MrPowers/spark-daria/blob/master/src/main/scala/com/github/mrpowers/spark/daria/sql/FunctionsAsColumnExt.scala Let me know your thoughts.
Why not adding the scala version to artifact name. eg: spark-daria_2.11
Careless user like me may leave ()
out when defining the schema
val df = spark.createDF(List("foo", "bar"), List("data", StringType, true))
Currently, a MatchError
is thrown at runtime.
Would it be better to report error at compile time ?
The createDF
method is very useful.
I think a createDS
method is needed as well, so we don't have to do this:
val sourceDS = spark.createDataset[Person](
Seq(
Person("Alice", 12),
Person("Bob", 17)
)
)
I'd rather do this:
val sourceDS = spark.createDS[Person](
List(
("Alice", 12),
("Bob", 17)
)
)
Spark allows you to use primitive objects to store metadata information about columns in a dataframe. It is useful in ML and analytical tasks where you'd like to avoid recomputing basic statistics on the data. The current way to do that is very messy. This is a feature that is not known or used by many, so adding this may help in increasing its adoption.
For further reference: https://stackoverflow.com/questions/32628845/is-there-a-way-to-add-extra-metadata-for-spark-dataframes
Could you add build for scala 2.12
Validates that the field values are with in a given range (and value geq \ gt \ leq \ lt). It is easy to implement but would be nice to wrap it in a function.
Something like -
validateRangeFields(Seq[Tuple(String, Numeric, Numeric, Boolean, Boolean)])
(sorry for the poor scala syntax, I'm a scala newbie)
Where the tuple fields stands for -
Thanks!
I have tried using the camelCaseToSnakeCaseColumns transform but it doesn't seem to be working when combined with other transformations.
Here is the production code:
def standardRefinedColumnCleaning()(df: Dataset[Row]): Dataset[Row] = {
df.transform(snakeCaseColumns())
.transform(camelCaseToSnakeCaseColumns())
.transform(sortColumns())
}
And the test case
it("should snake case columns for columns with spaces or camel cased") {
val someDF = Seq(
("foo", "bar")
).toDF("SomeColumn", "A b C")
val refinedDataSet = RefinedTransforms.standardRefinedColumnCleaning()(someDF)
assert(refinedDataSet.columns == Array("a_b_c", "some_column"))
}
The result is
Expected :Array("a_b_c", "some_column")
Actual :Array("a_b_c", "somecolumn")
If I only run the camelCaseToSnakeCaseColumns
then it works
I got a write exception when trying to write to S3 after running ParquetCompactor..
Update the README and wiki to document the createDF method
I tried to add this as a dependency in my project but maven couldn't resolve it so i went to the url and saw there was no directory for spark-daria. The jars are not published.
Can you please publish them soon.
Thanks !!
I'm watching this talk and they're discussing how they run validations that are specified in YAML files.
Would it be useful to add more validations to spark-daria? Should we have a DariaValidator.validatesLengthIsLessThan("my_cool_column", 5)
and DariaValidator.validatesBetween("my_integers", 3, 10)
methods?
BTW, all of the DataFrameValidator
methods were copied over to the DariaValidator
object due to a weird bug I ran into when using SBT shading with traits. Let me know if you're interested in understanding more about the SBT shading weirdness I uncovered.
when using latest version:
Errors occurred while build effective model from C:\Users\igreenfield.gradle\caches\modules-2\files-2.1\mrpowers\spark-daria\0.31.0-s_2.11\c12b7d34b6165a7ad5de127edb9259fc094f3ae2\spark-daria-0.31.0-s_2.11.pom:
'repositories.repository.id' must be unique: SparkPackagesRepo -> https://dl.bintray.com/spark-packages/maven/ vs http://dl.bintray.com/spark-packages/maven/ in mrpowers:spark-daria:0.31.0-s_2.11
and in the pom file I see:
<repositories>
<repository>
<id>SparkPackagesRepo</id>
<name>Spark Packages Repo</name>
<url>https://dl.bintray.com/spark-packages/maven/</url>
<layout>default</layout>
</repository>
<repository>
<id>SparkPackagesRepo</id>
<name>Spark Packages Repo</name>
<url>http://dl.bintray.com/spark-packages/maven/</url>
<layout>default</layout>
</repository>
</repositories>
Try to create a better error messages when the row sizes aren't equal to the number of columns with the createDF
method.
Could you please point me to right version for mentioned Spark and Scala?
I am trying to write some test cases to validate the data between a .parquet file in s3 and target (hive table). I have loaded the .parquet data into one dataframe and the hive table data into another dataframe. When I now try to compare the schema of the two dataframes, using 'assertSmallDataFrameEquality' it returns false, eventhough schema is same. Not sure why it is failing. Any suggestions would be helpful?
dl.bintray.com is unfortunately not accessible from many of the clients I work with. Is it possible to publish spark-daria into maven central?
I'm getting a strange error. I'm not a regular Scala user, so I may be doing something silly.
First, I start a Spark shell as follows:
spark-shell --packages "org.apache.hadoop:hadoop-aws:2.7.6,mrpowers:spark-daria:0.32.0-s_2.11"
Then I run this code:
scala> val df = spark.read.parquet("s3a://...")
[Stage 0:> (0 + 1)
df: org.apache.spark.sql.DataFrame = [... 96 more fields]
scala> import com.github.mrpowers.spark.daria.sql.DataFrameHelpers
import com.github.mrpowers.spark.daria.sql.DataFrameHelpers
scala> DataFrameHelpers.printAthenaCreateTable(
| df,
| "my.table",
| "s3a://..."
| )
java.io.FileNotFoundException: /Users/powers/Documents/code/my_apps/spark-daria/target/scala-2.11/scoverage-data/scoverage.measurements.1 (No such file or directory)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
at java.io.FileWriter.<init>(FileWriter.java:107)
at scoverage.Invoker$$anonfun$1.apply(Invoker.scala:42)
at scoverage.Invoker$$anonfun$1.apply(Invoker.scala:42)
at scala.collection.concurrent.TrieMap.getOrElseUpdate(TrieMap.scala:901)
at scoverage.Invoker$.invoked(Invoker.scala:42)
at com.github.mrpowers.spark.daria.sql.DataFrameHelpers$.printAthenaCreate
Table(DataFrameHelpers.scala:194)
... 53 elided
The reference to /Users/powers/
seems strange, and suggests some path from the project author's workstation got mistakenly baked into the package somehow.
Scalafmt is doing a bad job with some formatting - here's an example.
@snithish @gorros - Any idea how we can modify the scalafmt settings so the tests are formatted better.
cc: @kirubakarrs
Bump the version
Upload the latest version to Spark Packages
Update spark-spec to use the latest version of spark-daria
MrPowers,
I'm prepping up write my first spark ETL and your posts on medium blog are very helpful. I would be super excited to read the document from a working link and see how can I use spark-daria in my ETL job.
The document link is not working.
https://mrpowers.github.io/docs/spark_daria/index.html#package
@snithish @lizparody - I'd like to make the createDF
method even better.
The createDF
method currently works like this:
val expectedDF = spark.createDF(
List(
Row(8, 2.0),
Row(64, 4.0),
Row(-27, -3.0)
), List(
StructField("num1", IntegerType, false),
StructField("cube_root", DoubleType, true)
)
)
I'd like to make it even more concise by allowing this syntax:
val expectedDF = spark.createDF(
List(
(8, 2.0),
(64, 4.0),
(-27, -3.0)
), List(
("num1", IntegerType, false),
("cube_root", DoubleType, true)
)
)
Let me know if you know how to make both syntaxes work. I tried and failed.
Do this: apache/spark#24232
@nvander1 - want to submit a PR for this one? cc: @manuzhang
We chatted about this work here: #80 (comment)
A lot of queries are pushed down to the database level when Snowflake is used as described in this blog post.
Joins, aggregations, and SQL functions are all pushed down and performed in the Snowflake database before data is sent to Spark.
I know some stuff gets pushed down to Postgres (column pruning), but are joins and aggregations being pushed down? @nvander1 - do you know what gets pushed down to Postgres? Is this something we could improve?
Some analyses could do a lot of stuff at the database level, only send a fraction of the data to the Spark cluster, and then probably perform a lot faster. Spark isn't the best at joins, so pushing those down to the database level would probably help a lot...
It would be nice to have a function which would traverse schema and apply some function on a column. For example, we could use it to make columns nullable or add metadata.
https://help.github.com/en/articles/about-github-package-registry
Could be a cool place to host this project.
@afranzi - If you have a sec, can you please create a PR to add junit4git to this project? That's the library you mentioned in your talk right?
Some guys from Blizzard were asking me about how to use junit4git after your talk - I told them you were the expert. We can hopefully show them how to use this workflow here. I need to learn it too!
spark-daria follows the standard Scala / Java deep nesting package convention that's annoying when importing code.
Users currently need to import code like this: import com.github.mrpowers.spark.daria.sql.ColumnExt._
I noticed that some libraries are deviating from these Scala conventions and offering imports like this: import utest._
.
Maybe we can change the package structure so users can import code like import mrpowers.daria.sql.ColumnExt._
? Thoughts @nvander1 / @manuzhang...?
dropDuplicates
works with a list of string arguments.
killDuplicates
should also work with a list of string arguments.
Apache Spark itself is using these settings:
matrix:
java: [ '1.8', '11' ]
hadoop: [ 'hadoop-2.7', 'hadoop-3.2' ]
exclude:
- java: '11'
hadoop: 'hadoop-2.7'
I think spark-daria can simply be tested with Java 1.8 and without any Hadoop specified, correct? I don't think we need to be testing multiple different Java / Hadoop versions.
I just learned that Java 8 and Java 1.8 are the same thing... what?!
I'm not even going to ask why Java 9 and Java 10 aren't included in this discussion. So confusing!!
Right now, containsColumn
only takes a string argument (the column name). It'd be nice if the user could pass in a StructField
argument.
ParquetCompactor is not deleting old files and the input_file_name_parts directory on S3.
We are using the spark databricks platform, spark 6.2, pyspark and mrpowers:spark-daria:0.36.0-s_2.11. After running ParquetCompactor we have a new big parquet file, but the old files and the input_file_name_parts directory still exists.
Is it not possible to use the ParquetCompactor on S3?
Is scoverage the best option?
@snithish - We need to figure out a good way to manage the spark-daria JAR files, so they're easily accessible for different versions of Scala and Spark.
I'm currently uploading the spark-daria JAR files to Spark Packages. Spark Packages seems to upload these to Maven somehow. Do you think we should continue using Spark Packages or migrate to Maven?
I like how spark-testing-base manages the JAR files in Maven. They make it easy to access different versions of spark-testing-base for different Spark, Scala, and spark-testing-base versions. Should we try to replicate the spark-testing-base approach?
Thanks!
We should make a major release to provide users with a stable public interface.
Here's how we can get this project ready for a major release:
See the discussion in this PR: #37 (comment)
This is for when the deploy script is run and the user forgets to bump the version first, a mistake I frequently make.
It seems we don't have a forum to ask questions, brainstorm or share with each other. How about adding a https://gitter.im room ?
@snithish - The DataFrame validation code is a very important portion of the spark-daria project: https://github.com/MrPowers/spark-daria/blob/master/src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameValidator.scala
It's important to validate DataFrame dependencies for public DataFrame transformations, so users get descriptive error messages.
Can you please take a look at the code and let me know if you have any questions / comments? I'd like to get the Spark community to start using this DataFrame validation code. Thanks!
Hi @MrPowers. I started a new Maven project with Spark 2.4 and Scala 2.11.12. Is there a corresponding build for this configuration?
When I add one more level of nested structure it fails to flatten.
"uses the StackOverflow answer format" - {
val data = Seq(
Row(
Row(
"this",
"is"
),
"something",
"cool",
";)"
)
)
val schema = StructType(
Seq(
StructField(
"foo",
StructType(
Seq(
StructField(
"bar",
StructType(
Seq(
StructField(
"zoo",
StringType,
true
)
)
)
),
StructField(
"baz",
StringType,
true
)
)
),
true
),
StructField(
"x",
StringType,
true
),
StructField(
"y",
StringType,
true
),
StructField(
"z",
StringType,
true
)
)
)
val df = spark
.createDataFrame(
spark.sparkContext.parallelize(data),
StructType(schema)
)
.flattenSchema("_")
val expectedDF = spark.createDF(
List(("this", "is", "something", "cool", ";)")),
List(
("foo_bar_zoo", StringType, true),
("foo_baz", StringType, true),
("x", StringType, true),
("y", StringType, true),
("z", StringType, true)
)
)
assertSmallDataFrameEquality(
df,
expectedDF
)
}
Look into this and possibly remove TravisCI it it's too annoying
@MrPowers, @nvander1,
do you think it's a good idea to add thie project to https://github.com/fthomas/scala-steward to keep the dependencies up to date ?
'repositories.repository.id' must be unique: SparkPackagesRepo -> https://dl.bintray.com/spark-packages/maven/ vs http://dl.bintray.com/spark-packages/maven/ in mrpowers:spark-daria:0.26.0
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.