Giter Site home page Giter Site logo

spark-daria's Issues

Functions as column extensions

@snithish @gorros @kirubakarrs @oscarvarto - I’ve always thought it’s a bit random how Spark defines some functionality as Column functions and other functionality as SQL functions. Here’s an example of the inconsistency:

lower(col("blah").substr(0, 2))

Having two SQL functions would look like this:

lower(substr(col("blah"), 0, 2))

Having two Column functions would look like this:

col("blah").substr(0, 2).lower()

I like the Column functions syntax, so I started monkey patching the SQL functions to the Column class: https://github.com/MrPowers/spark-daria/blob/master/src/main/scala/com/github/mrpowers/spark/daria/sql/FunctionsAsColumnExt.scala Let me know your thoughts.

asSchema should have Product as upper type bound

Careless user like me may leave () out when defining the schema

val df = spark.createDF(List("foo", "bar"),  List("data", StringType, true))

Currently, a MatchError is thrown at runtime.
Would it be better to report error at compile time ?

@MrPowers

Add a SparkSessionExt `createDS` method

The createDF method is very useful.

I think a createDS method is needed as well, so we don't have to do this:

val sourceDS = spark.createDataset[Person](
  Seq(
    Person("Alice", 12),
    Person("Bob", 17)
  )
)

I'd rather do this:

val sourceDS = spark.createDS[Person](
  List(
    ("Alice", 12),
    ("Bob", 17)
  )
)

Saner setter and getter methods for schema metadata

Spark allows you to use primitive objects to store metadata information about columns in a dataframe. It is useful in ML and analytical tasks where you'd like to avoid recomputing basic statistics on the data. The current way to do that is very messy. This is a feature that is not known or used by many, so adding this may help in increasing its adoption.

For further reference: https://stackoverflow.com/questions/32628845/is-there-a-way-to-add-extra-metadata-for-spark-dataframes

Feature request - range validator

Validates that the field values are with in a given range (and value geq \ gt \ leq \ lt). It is easy to implement but would be nice to wrap it in a function.
Something like -
validateRangeFields(Seq[Tuple(String, Numeric, Numeric, Boolean, Boolean)])

(sorry for the poor scala syntax, I'm a scala newbie)

Where the tuple fields stands for -

  • column name
  • range start
  • range end
  • start inclusive (false implies > check, true implies >= check)
  • end inclusive (false implies > check, true implies >= check)

Thanks!

camelCaseToSnakeCaseColumns doesn't seem to work

I have tried using the camelCaseToSnakeCaseColumns transform but it doesn't seem to be working when combined with other transformations.

Here is the production code:

def standardRefinedColumnCleaning()(df: Dataset[Row]): Dataset[Row] = {
    df.transform(snakeCaseColumns())
      .transform(camelCaseToSnakeCaseColumns())
      .transform(sortColumns())
  }

And the test case

it("should snake case columns for columns with spaces or camel cased") {
      val someDF = Seq(
        ("foo", "bar")
      ).toDF("SomeColumn", "A b C")

      val refinedDataSet = RefinedTransforms.standardRefinedColumnCleaning()(someDF)

      assert(refinedDataSet.columns == Array("a_b_c", "some_column"))
    }

The result is

Expected :Array("a_b_c", "some_column")
Actual   :Array("a_b_c", "somecolumn")

If I only run the camelCaseToSnakeCaseColumns then it works

Should we add more validators?

I'm watching this talk and they're discussing how they run validations that are specified in YAML files.

Would it be useful to add more validations to spark-daria? Should we have a DariaValidator.validatesLengthIsLessThan("my_cool_column", 5) and DariaValidator.validatesBetween("my_integers", 3, 10) methods?

BTW, all of the DataFrameValidator methods were copied over to the DariaValidator object due to a weird bug I ran into when using SBT shading with traits. Let me know if you're interested in understanding more about the SBT shading weirdness I uncovered.

duplicate repo entries in pom file

when using latest version:
Errors occurred while build effective model from C:\Users\igreenfield.gradle\caches\modules-2\files-2.1\mrpowers\spark-daria\0.31.0-s_2.11\c12b7d34b6165a7ad5de127edb9259fc094f3ae2\spark-daria-0.31.0-s_2.11.pom:
'repositories.repository.id' must be unique: SparkPackagesRepo -> https://dl.bintray.com/spark-packages/maven/ vs http://dl.bintray.com/spark-packages/maven/ in mrpowers:spark-daria:0.31.0-s_2.11

and in the pom file I see:

<repositories>
        <repository>
            <id>SparkPackagesRepo</id>
            <name>Spark Packages Repo</name>
            <url>https://dl.bintray.com/spark-packages/maven/</url>
            <layout>default</layout>
        </repository>
        <repository>
            <id>SparkPackagesRepo</id>
            <name>Spark Packages Repo</name>
            <url>http://dl.bintray.com/spark-packages/maven/</url>
            <layout>default</layout>
        </repository>
    </repositories>

Datafram schema comparision fails with assertSmallDataFrameEquality method, eventhough schema is same

I am trying to write some test cases to validate the data between a .parquet file in s3 and target (hive table). I have loaded the .parquet data into one dataframe and the hive table data into another dataframe. When I now try to compare the schema of the two dataframes, using 'assertSmallDataFrameEquality' it returns false, eventhough schema is same. Not sure why it is failing. Any suggestions would be helpful?

Publish into maven central

dl.bintray.com is unfortunately not accessible from many of the clients I work with. Is it possible to publish spark-daria into maven central?

Strange FileNotFoundException when running printAthenaCreateTable

I'm getting a strange error. I'm not a regular Scala user, so I may be doing something silly.

First, I start a Spark shell as follows:

spark-shell --packages "org.apache.hadoop:hadoop-aws:2.7.6,mrpowers:spark-daria:0.32.0-s_2.11"

Then I run this code:

scala> val df = spark.read.parquet("s3a://...")
[Stage 0:>                                                          (0 + 1) 
                                                                            
df: org.apache.spark.sql.DataFrame = [... 96 more fields]

scala> import com.github.mrpowers.spark.daria.sql.DataFrameHelpers
import com.github.mrpowers.spark.daria.sql.DataFrameHelpers

scala> DataFrameHelpers.printAthenaCreateTable(
     |     df,
     |     "my.table",
     |     "s3a://..."
     | )
java.io.FileNotFoundException: /Users/powers/Documents/code/my_apps/spark-daria/target/scala-2.11/scoverage-data/scoverage.measurements.1 (No such file or directory)
  at java.io.FileOutputStream.open0(Native Method)
  at java.io.FileOutputStream.open(FileOutputStream.java:270)
  at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
  at java.io.FileWriter.<init>(FileWriter.java:107)
  at scoverage.Invoker$$anonfun$1.apply(Invoker.scala:42)
  at scoverage.Invoker$$anonfun$1.apply(Invoker.scala:42)
  at scala.collection.concurrent.TrieMap.getOrElseUpdate(TrieMap.scala:901)
  at scoverage.Invoker$.invoked(Invoker.scala:42)
  at com.github.mrpowers.spark.daria.sql.DataFrameHelpers$.printAthenaCreate
Table(DataFrameHelpers.scala:194)
  ... 53 elided

The reference to /Users/powers/ seems strange, and suggests some path from the project author's workstation got mistakenly baked into the package somehow.

Make the createDF method even more concise

@snithish @lizparody - I'd like to make the createDF method even better.

The createDF method currently works like this:

val expectedDF = spark.createDF(
  List(
    Row(8, 2.0),
    Row(64, 4.0),
    Row(-27, -3.0)
  ), List(
    StructField("num1", IntegerType, false),
    StructField("cube_root", DoubleType, true)
  )
)

I'd like to make it even more concise by allowing this syntax:

val expectedDF = spark.createDF(
  List(
   (8, 2.0),
   (64, 4.0),
   (-27, -3.0)
  ), List(
   ("num1", IntegerType, false),
   ("cube_root", DoubleType, true)
  )
)

Let me know if you know how to make both syntaxes work. I tried and failed.

Enhancing query pushdown in Postgres...

A lot of queries are pushed down to the database level when Snowflake is used as described in this blog post.

Joins, aggregations, and SQL functions are all pushed down and performed in the Snowflake database before data is sent to Spark.

I know some stuff gets pushed down to Postgres (column pruning), but are joins and aggregations being pushed down? @nvander1 - do you know what gets pushed down to Postgres? Is this something we could improve?

Some analyses could do a lot of stuff at the database level, only send a fraction of the data to the Spark cluster, and then probably perform a lot faster. Spark isn't the best at joins, so pushing those down to the database level would probably help a lot...

Function to traverse dataframe schema.

It would be nice to have a function which would traverse schema and apply some function on a column. For example, we could use it to make columns nullable or add metadata.

Add junit4git

@afranzi - If you have a sec, can you please create a PR to add junit4git to this project? That's the library you mentioned in your talk right?

Some guys from Blizzard were asking me about how to use junit4git after your talk - I told them you were the expert. We can hopefully show them how to use this workflow here. I need to learn it too!

Deeply nested package structure

spark-daria follows the standard Scala / Java deep nesting package convention that's annoying when importing code.

Users currently need to import code like this: import com.github.mrpowers.spark.daria.sql.ColumnExt._

I noticed that some libraries are deviating from these Scala conventions and offering imports like this: import utest._.

Maybe we can change the package structure so users can import code like import mrpowers.daria.sql.ColumnExt._? Thoughts @nvander1 / @manuzhang...?

Java, JDK and Hadoop versions for CI

Apache Spark itself is using these settings:

matrix:
  java: [ '1.8', '11' ]
  hadoop: [ 'hadoop-2.7', 'hadoop-3.2' ]
  exclude:
  - java: '11'
    hadoop: 'hadoop-2.7'

I think spark-daria can simply be tested with Java 1.8 and without any Hadoop specified, correct? I don't think we need to be testing multiple different Java / Hadoop versions.

I just learned that Java 8 and Java 1.8 are the same thing... what?!

I'm not even going to ask why Java 9 and Java 10 aren't included in this discussion. So confusing!!

ParquetCompactor is not deleting old files and the input_file_name_parts directory on S3

ParquetCompactor is not deleting old files and the input_file_name_parts directory on S3.

We are using the spark databricks platform, spark 6.2, pyspark and mrpowers:spark-daria:0.36.0-s_2.11. After running ParquetCompactor we have a new big parquet file, but the old files and the input_file_name_parts directory still exists.

Is it not possible to use the ParquetCompactor on S3?

Managing JAR files

@snithish - We need to figure out a good way to manage the spark-daria JAR files, so they're easily accessible for different versions of Scala and Spark.

I'm currently uploading the spark-daria JAR files to Spark Packages. Spark Packages seems to upload these to Maven somehow. Do you think we should continue using Spark Packages or migrate to Maven?

I like how spark-testing-base manages the JAR files in Maven. They make it easy to access different versions of spark-testing-base for different Spark, Scala, and spark-testing-base versions. Should we try to replicate the spark-testing-base approach?

Thanks!

1.0 major release

We should make a major release to provide users with a stable public interface.

Here's how we can get this project ready for a major release:

  • Go through the entire codebase and see if any methods should be deleted
  • Make sure all issues are incorporated in the project or closed
  • Improve the code quality when necessary
  • Make all private methods are flagged accordingly and the documentation is up-to-date
  • Create a more official release process going forward

Get feedback on DataFrame validation code

@snithish - The DataFrame validation code is a very important portion of the spark-daria project: https://github.com/MrPowers/spark-daria/blob/master/src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameValidator.scala

It's important to validate DataFrame dependencies for public DataFrame transformations, so users get descriptive error messages.

Can you please take a look at the code and let me know if you have any questions / comments? I'd like to get the Spark community to start using this DataFrame validation code. Thanks!

Three level nested structure flattening fails

When I add one more level of nested structure it fails to flatten.

"uses the StackOverflow answer format" - {

        val data = Seq(
          Row(
            Row(
              "this",
              "is"
            ),
            "something",
            "cool",
            ";)"
          )
        )

        val schema = StructType(
          Seq(
            StructField(
              "foo",
              StructType(
                Seq(
                  StructField(
                    "bar",
                    StructType(
                      Seq(
                        StructField(
                          "zoo",
                          StringType,
                          true
                        )
                      )
                    )
                  ),
                  StructField(
                    "baz",
                    StringType,
                    true
                  )
                )
              ),
              true
            ),
            StructField(
              "x",
              StringType,
              true
            ),
            StructField(
              "y",
              StringType,
              true
            ),
            StructField(
              "z",
              StringType,
              true
            )
          )
        )

        val df = spark
          .createDataFrame(
            spark.sparkContext.parallelize(data),
            StructType(schema)
          )
          .flattenSchema("_")

        val expectedDF = spark.createDF(
          List(("this", "is", "something", "cool", ";)")),
          List(
            ("foo_bar_zoo", StringType, true),
            ("foo_baz", StringType, true),
            ("x", StringType, true),
            ("y", StringType, true),
            ("z", StringType, true)
          )
        )

        assertSmallDataFrameEquality(
          df,
          expectedDF
        )

      }

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.