Giter Site home page Giter Site logo

frameless's Introduction

Frameless

Workflow Badge Codecov Badge Discord Badge Maven Badge Snapshots Badge

Frameless is a Scala library for working with Spark using more expressive types. It consists of the following modules:

  • frameless-dataset for a more strongly typed Dataset/DataFrame API
  • frameless-ml for a more strongly typed Spark ML API based on frameless-dataset
  • frameless-cats for using Spark's RDD API with cats

Note that while Frameless is still getting off the ground, it is very possible that breaking changes will be made for at least the next few versions.

The Frameless project and contributors support the Typelevel Code of Conduct and want all its associated channels (e.g. GitHub, Discord) to be a safe and friendly environment for contributing and learning.

Versions and dependencies

The compatible versions of Spark and cats are as follows:

Frameless Spark Cats Cats-Effect Scala
0.16.0 3.5.0 / 3.4.0 / 3.3.0 2.x 3.x 2.12 / 2.13
0.15.0 3.4.0 / 3.3.0 / 3.2.2 2.x 3.x 2.12 / 2.13
0.14.1 3.4.0 / 3.3.0 / 3.2.2 2.x 3.x 2.12 / 2.13
0.14.0 3.3.0 / 3.2.2 / 3.1.3 2.x 3.x 2.12 / 2.13
0.13.0 3.3.0 / 3.2.2 / 3.1.3 2.x 3.x 2.12 / 2.13
0.12.0 3.2.1 / 3.1.3 / 3.0.3 2.x 3.x 2.12 / 2.13
0.11.1 3.2.0 / 3.1.2 / 3.0.1 2.x 2.x 2.12 / 2.13
0.11.0* 3.2.0 / 3.1.2 / 3.0.1 2.x 2.x 2.12 / 2.13
0.10.1 3.1.0 2.x 2.x 2.12
0.9.0 3.0.0 1.x 1.x 2.12
0.8.0 2.4.0 1.x 1.x 2.11 / 2.12
0.7.0 2.3.1 1.x 1.x 2.11
0.6.1 2.3.0 1.x 0.8 2.11
0.5.2 2.2.1 1.x 0.8 2.11
0.4.1 2.2.0 1.x 0.8 2.11
0.4.0 2.2.0 1.0.0-IF 0.4 2.11

* 0.11.0 has broken Spark 3.1.2 and 3.0.1 artifacts published.

Starting 0.11 we introduced Spark cross published artifacts:

  • By default, frameless artifacts depend on the most recent Spark version
  • Suffix -spark{major}{minor} is added to artifacts that are released for the previous Spark version(s)

Artifact names examples:

  • frameless-dataset (the latest Spark dependency)
  • frameless-dataset-spark33 (Spark 3.3.x dependency)
  • frameless-dataset-spark32 (Spark 3.2.x dependency)

Versions 0.5.x and 0.6.x have identical features. The first is compatible with Spark 2.2.1 and the second with 2.3.0.

The only dependency of the frameless-dataset module is on shapeless 2.3.2. Therefore, depending on frameless-dataset, has a minimal overhead on your Spark's application jar. Only the frameless-cats module depends on cats and cats-effect, so if you prefer to work just with Datasets and not with RDDs, you may choose not to depend on frameless-cats.

Frameless intentionally does not have a compile dependency on Spark. This essentially allows you to use any version of Frameless with any version of Spark. The aforementioned table simply provides the versions of Spark we officially compile and test Frameless with, but other versions may probably work as well.

Breaking changes in 0.9

  • Spark 3 introduces a new ExpressionEncoder approach, the schema for single value DataFrame's is now "value" not "_1".

Why?

Frameless introduces a new Spark API, called TypedDataset. The benefits of using TypedDataset compared to the standard Spark Dataset API are as follows:

  • Typesafe columns referencing (e.g., no more runtime errors when accessing non-existing columns)
  • Customizable, typesafe encoders (e.g., if a type does not have an encoder, it should not compile)
  • Enhanced type signature for built-in functions (e.g., if you apply an arithmetic operation on a non-numeric column, you get a compilation error)
  • Typesafe casting and projections

Click here for a detailed comparison of TypedDataset with Spark's Dataset API.

Documentation

Quick Start

Since the 0.9.x release, Frameless is compiled only against Scala 2.12.x.

To use Frameless in your project add the following in your build.sbt file as needed:

val framelessVersion = "<latest version>"

resolvers ++= Seq(
  // for snapshot artifacts only
  "s01-oss-sonatype" at "https://s01.oss.sonatype.org/content/repositories/snapshots"
)

libraryDependencies ++= List(
  "org.typelevel" %% "frameless-dataset" % framelessVersion,
  "org.typelevel" %% "frameless-ml"      % framelessVersion,
  "org.typelevel" %% "frameless-cats"    % framelessVersion
)

An easy way to bootstrap a Frameless sbt project:

  • if you have Giter8 installed then simply:
g8 imarios/frameless.g8
  • with sbt >= 0.13.13:
sbt new imarios/frameless.g8

Typing sbt console inside your project will bring up a shell with Frameless and all its dependencies loaded (including Spark).

Need help?

Feel free to messages us on our discord channel for any issues/questions.

Development

We require at least one sign-off (thumbs-up, +1, or similar) to merge pull requests. The current maintainers (people who can merge pull requests) are:

Testing

Frameless contains several property tests. To avoid OutOfMemoryErrors, we tune the default generator sizes. The following environment variables may be set to adjust the size of generated collections in the TypedDataSet suite:

Property Default
FRAMELESS_GEN_MIN_SIZE 0
FRAMELESS_GEN_SIZE_RANGE 20

License

Code is provided under the Apache 2.0 license available at http://opensource.org/licenses/Apache-2.0, as well as in the LICENSE file. This is the same license used as Spark.

frameless's People

Contributors

adelbertc avatar armanbilge avatar atamborrino avatar avasil avatar ayoub-benali avatar bamine avatar cchantep avatar ceedubs avatar chris-twiner avatar combinatorist avatar crossy147 avatar frosforever avatar grafblutwurst avatar imarios avatar jeremyrsmith avatar julien-truffaut avatar kanterov avatar kmate avatar kujon avatar larsrh avatar martin1keogh avatar mt40 avatar olivierblanvillain avatar pomadchin avatar rossabaker avatar rrjohnson avatar scala-steward avatar typelevel-steward[bot] avatar vejeta avatar xuwei-k avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

frameless's Issues

Bug: Sum of Int returns long in Spark but we return Int.

This leads to issues:

scala> val other = x.groupBy( x('_2) ).agg( sum(x('_1)) )
other: frameless.TypedDataset[(String, Int)] = [_1: string, _2: bigint]

As you see, spark treats _2 as bigint whereas we return `Int.

We should address this before 0.2

Support more agg functions

There are so many from spark. I think we should add few more popular (like count distinct, etc.) and then maybe have an easy way to create new ones on the fly (if someone needs one we do not have). We can use similar strategy to what we did with UDF.

When i'm trying to save type Long in table, it converted into BigInt.

issue:

val p = TypedDataset.create[Basis](Basis)

val a = p.col[Long]('a)
val b = p.col[String]('b)

case class pay(dateOf: Long, str: Option[String])

p
      .select(a, b).as[pay]
      .dataset.write.mode("overwrite").format("parquet").saveAsTable(fullViewName)

But when saw in Hive table:

hive> show create table _.testTypedDS;
OK
CREATE TABLE `_.testTypedDS`(
  `dateof` bigint,
  `str` string)

New release - 0.3.0?

It would be โœจ great โœจ to get a new release that rolls up the changes since Dec 2016, which was when 0.2.0 was cut.

In particular, this would be to pick up changes that would have compatibility with Spark 2.1.0 so I wouldn't have to depend directly on the Git repo (see: https://gitter.im/typelevel/frameless?at=58d981ddb52518ed4db2fbfe), which for obvious reasons isn't best thing in the world.

How about cutting a 0.3.0?

CodeGen fails when case class fields are reserved java keywords

It is possible to define a case class with reserve field names using back-ticks.

case class Foo(a: String, `if`: Int)
val t = TypedDataset.create(Seq(Foo("a",2), Foo("b",2)))

Fails with the following error:

17/06/01 00:45:54 ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 49, Column 44: Unexpected selector 'if' after "."
/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */   return new SpecificUnsafeProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
/* 006 */
/* 007 */   private Object[] references;
/* 008 */   private UnsafeRow result;
/* 009 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder holder;
/* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter;
/* 011 */
/* 012 */
/* 013 */   public SpecificUnsafeProjection(Object[] references) {
/* 014 */     this.references = references;
/* 015 */     result = new UnsafeRow(2);
/* 016 */     this.holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(result, 32);
/* 017 */     this.rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 2);
/* 018 */   }
/* 019 */
/* 020 */   // Scala.Function1 need this
/* 021 */   public java.lang.Object apply(java.lang.Object row) {
/* 022 */     return apply((InternalRow) row);
/* 023 */   }
/* 024 */
/* 025 */   public UnsafeRow apply(InternalRow i) {
/* 026 */     holder.reset();
/* 027 */
/* 028 */     rowWriter.zeroOutNullBytes();
/* 029 */
/* 030 */
/* 031 */     $line48.$read$$iw$$iw$$iw$$iw$$iw$$iw$Foo value2 = ($line48.$read$$iw$$iw$$iw$$iw$$iw$$iw$Foo)i.get(0, null);
/* 032 */
/* 033 */     boolean isNull1 = false;
/* 034 */     final java.lang.String value1 = isNull1 ? null : (java.lang.String) value2.a();
/* 035 */     isNull1 = value1 == null;
/* 036 */     boolean isNull = isNull1;
/* 037 */     final UTF8String value = isNull ? null : org.apache.spark.unsafe.types.UTF8String.fromString(value1);
/* 038 */     isNull = value == null;
/* 039 */     if (isNull) {
/* 040 */       rowWriter.setNullAt(0);
/* 041 */     } else {
/* 042 */       rowWriter.write(0, value);
/* 043 */     }
/* 044 */
/* 045 */
/* 046 */     $line48.$read$$iw$$iw$$iw$$iw$$iw$$iw$Foo value4 = ($line48.$read$$iw$$iw$$iw$$iw$$iw$$iw$Foo)i.get(0, null);
/* 047 */
/* 048 */     boolean isNull3 = false;
/* 049 */     final int value3 = isNull3 ? -1 : value4.if();
/* 050 */     if (isNull3) {
/* 051 */       rowWriter.setNullAt(1);
/* 052 */     } else {
/* 053 */       rowWriter.write(1, value3);
/* 054 */     }
/* 055 */     result.setTotalSize(holder.totalSize());
/* 056 */     return result;
/* 057 */   }
/* 058 */ }

Bug: Projection operations (via select) propagate problematic column names that lead to run-time failures

Better to show this with an example:

scala> case class Foo(i: Int)
defined class Foo

scala> val e = TypedDataset.create[Foo]( Foo(1) :: Nil )
e: frameless.TypedDataset[Foo] = [i: int]

scala> val t = e.select( e.col('i) , e.col('i) )
t: frameless.TypedDataset[(Int, Int)] = [i: int, i: int]

scala> t.col('_1)  // even though t has type TypedDataset[(Int, Int)] 
org.apache.spark.sql.AnalysisException: Cannot resolve column name "_1" among (i, i);
  at org.apache.spark.sql.Dataset$$anonfun$resolve$1.apply(Dataset.scala:220)
  at org.apache.spark.sql.Dataset$$anonfun$resolve$1.apply(Dataset.scala:220)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.sql.Dataset.resolve(Dataset.scala:219)
  at org.apache.spark.sql.Dataset.col(Dataset.scala:921)
  at frameless.TypedDataset.col(TypedDataset.scala:64)
  ... 42 elided

scala> case class Bar(i: Int, j: Int)
defined class Bar

scala> val tb = t.as[Bar]
tb: frameless.TypedDataset[Bar] = [i: int, i: int]

scala> tb.select(tb.col('j)) // even though tb is of type TypedDataset[Bar]
org.apache.spark.sql.AnalysisException: Cannot resolve column name "j" among (i, i);
  at org.apache.spark.sql.Dataset$$anonfun$resolve$1.apply(Dataset.scala:220)
  at org.apache.spark.sql.Dataset$$anonfun$resolve$1.apply(Dataset.scala:220)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.sql.Dataset.resolve(Dataset.scala:219)
  at org.apache.spark.sql.Dataset.col(Dataset.scala:921)
  at frameless.TypedDataset.col(TypedDataset.scala:64)
  ... 42 elided

Alternate column referencing syntax for TypeDataset

It would be nice to add an alternate column referencing syntax to the TypedDataset API which is closer to the vanilla syntax, similarly to the way it's done in TypeDataFrame.

Currently it looks like: (source)

val dataset = TypedDataset.create(data)
val A = dataset.col[A]('a)
val B = dataset.col[B]('b)

val dataset2 = dataset.select(A, B).collect().run().toVector

I think it should be possible to change it to something like:

val dataset = TypedDataset.create(data)
val dataset2 = dataset.select('a, 'b).collect().run().toVector

It would be also interesting to investigate an alternate syntax for td.colMany('b, 'b) (equivalent to accessing _.b.b).

Create PDF documentation

PDF documentation would be useful when there's no internet connection.

I've created some scripts which work well enough for cats, dogs and fetch, as shown below:
https://github.com/frgomes/debian-bin/blob/master/bash_30pdf.sh
https://github.com/frgomes/debian-bin/blob/master/bash_30httrack.sh
https://github.com/frgomes/debian-bin/blob/master/bash_31makepdf_cats.sh
https://github.com/frgomes/debian-bin/blob/master/bash_31makepdf_dogs.sh
https://github.com/frgomes/debian-bin/blob/master/bash_31makepdf_fetch.sh

However, a similar script for Frameless...
https://github.com/frgomes/debian-bin/blob/master/bash_31makepdf_frameless.sh
... does not work well because Frameless employs frames (sic!). In more detail: PDF files are created from HTML files but nothing can be seen since contents are hidden inside a collapsed frame.

Framless breaks with Spark 2.1.1

Reproduce:

  • Install Spark 2.1.1
  • Execute:

val lines = spark.read.textFile(fileName).typed lines.show

Expected: TypedDataset[String]

Result:

java.lang.VerifyError: Bad type on operand stack
Exception Details:
Location:
frameless/TypedDataset.joinLeft(Lframeless/TypedDataset;Lframeless/TypedColumn;Lframeless/TypedColumn;Lframeless/TypedEncoder;Lframeless/TypedEncoder;)Lframeless/TypedDataset; @178: invokevirtual
Reason:
Type 'org/apache/spark/sql/catalyst/expressions/CreateStruct' (current frame, stack[8]) is not assignable to 'org/apache/spark/sql/catalyst/expressions/Expression'
Current Frame:
bci: @178
flags: { }
locals: { 'frameless/TypedDataset', 'frameless/TypedDataset', 'frameless/TypedColumn', 'frameless/TypedColumn', 'frameless/TypedEncoder', 'frameless/TypedEncoder', 'org/apache/spark/sql/catalyst/plans/logical/LogicalPlan', 'org/apache/spark/sql/catalyst/plans/logical/LogicalPlan', 'org/apache/spark/sql/catalyst/expressions/EqualTo', 'org/apache/spark/sql/catalyst/plans/logical/Join', 'org/apache/spark/sql/execution/QueryExecution', 'scala/collection/Seq', 'scala/collection/Seq', top, 'org/apache/spark/sql/catalyst/expressions/CreateStruct', 'java/lang/String' }
stack: { uninitialized 140, uninitialized 140, 'scala/collection/immutable/List$', 'scala/Predef$', '[Lorg/apache/spark/sql/catalyst/expressions/Alias;', '[Lorg/apache/spark/sql/catalyst/expressions/Alias;', integer, 'org/apache/spark/sql/catalyst/expressions/Alias$', 'org/apache/spark/sql/catalyst/expressions/CreateStruct', 'java/lang/String' }

Diagnosis: Upon request I installed Spark 2.0.2 and reran my code and it works flawlessly. Something in Spark 2.1.1 seems to break Frameless.

0.4 Release: Deprecate un-optimized forwarded methods such as map/filter

I believe we should not allow map(f: T => U) and filter(f: T => Boolean) on a TypedDataset[T]. These are really misleading. Those functions end up being forwarded directly to the underlying RDD and remain unoptimized. The same problems that RDDs had in terms of performance are showing up here again.

We should let Frameless users use the standard frameless/dataframes APIs to do these basic operation: Use select() instead of map() use the Frameless filter that operates on TypedColumns and don't use filter that takes a T=>Boolean closure. Mix UDFs if you want to do things in Scala.

I believe these two methods (map/filter) and potentially other, are quite misleading and should not be surfaced as top level APIs for TypedDataset.

Support for UDAF

Currently we have support for UDF only. UDAF are really useful and it will be nice if we can support them.

Relevant scala docs and this. The second seems to be the preferred way to do this with Datasets.

The Aggregator:

trait Aggregator[IN,BUF,OUT]
  • IN The input type for the aggregation.
  • BUF The type of the intermediate value of the reduction.
  • OUT The type of the final output result.

Example from SO:

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}

class BelowThreshold[I](f: I => Boolean)  extends Aggregator[I, Boolean, Boolean]
    with Serializable {
  val zero = false
  def reduce(acc: Boolean, x: I) = acc | f(x)
  def merge(acc1: Boolean, acc2: Boolean) = acc1 | acc2
  def finish(acc: Boolean) = acc

  def bufferEncoder: Encoder[Boolean] = Encoders.scalaBoolean
  def outputEncoder: Encoder[Boolean] = Encoders.scalaBoolean
}

val belowThreshold = new BelowThreshold[(String, Int)](_._2 < - 40).toColumn
df.as[(String, Int)].groupByKey(_._1).agg(belowThreshold)

Examples from spark

Other example from Databricks:

val simpleSum = new Aggregator[Int, Int, Int] with Serializable {
  def zero: Int = 0                     // The initial value.
  def reduce(b: Int, a: Int) = b + a    // Add an element to the running total
  def merge(b1: Int, b2: Int) = b1 + b2 // Merge intermediate values.
  def finish(b: Int) = b                // Return the final result.
}.toColumn

val ds = Seq(1, 2, 3, 4).toDS()
ds.select(simpleSum).collect

todo: Create an TypedAggregator that returns a TypedColumn.

The entire construct seems to be quite type safe. This could be really simple.

Better bounded types for TypedDatasets

Currently Spark' Tungsten encoder uses a mutable.WrappedArray to represent all sequences.

Unfortunately, both Spark and Frameless allow for Datasets to have arbitrary collections as their types (e.g., Dataset[List[Int]], TypedDatset[Vector[Int]], etc.) even though internally Spark doesn't really know anything about these types. Internally, it will treat all collections as Dataset[WrappedArray[Int]].

Ok, so what's the big deal? Well, things get tricky when you introduce UDFs into the picture.

val f = TypedDataset.create((1,Vector(2,2,3)) :: (1,Vector(2,4)) :: Nil)
val myUdf = f.makeUDF( (v: Vector[Int]) => v.sum )
f.select( f('_1), myUdf(f('_2))).show().run()

This should work, no? Well, it does compile just fine, but it will blow on run-time.

Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.collection.immutable.Vector

My proposal here is to find a way to get this working in a type-safe manner. One solution is to work on the internals of the UDF code to make this work (@kanterov?).

Another way, which is what I am currently thinking about, is to control the creation of the TypedDataset. That is, if we only allow the types to be (say) Dataset[WrappedArray[Int]] then everything should work as expected.

The simplest way to address this is to only have a TypedEncoder[WrappedArray] by default (instead of Vector)?

frameless.Job[A] enhancements

  • Update Job to accept a SparkSession instead of just a SparkContext (2.0 enhancement)
  • The idea of explicitly wrapping Spark actions in a Job[_] allows for some interesting additions. For once, it can run a Job with a timeout option. So we can run a job and say either return the result or fail if the job takes more than 20 minutes. runOrTimeOut(d: Duration): Validated[Error,A].

Bug: Self join doesn't seem right

Let me show this with an example:

case class Foo(i: Int, b: Long, c: String)
val f = TypedDataset.create( Foo(1,2,"a") :: Foo(2,2,"b") :: Foo(3,2,"b") :: Nil )
f.show().run
//+---+---+---+
//|  i|  b|  c|
//+---+---+---+
//|  1|  2|  a|
//|  2|  2|  b|
//|  3|  2|  b|
//+---+---+---+

f.join(f, f('c), f('c)).show().run

//+-------+-------+
//|     _1|     _2|
//+-------+-------+
//|[1,2,a]|[1,2,a]|
//|[1,2,a]|[2,2,b]|              <--- This doesn't look right, why 'a' and 'b' are joined together? 
//|[1,2,a]|[3,2,b]|
//|[2,2,b]|[1,2,a]|
//|[2,2,b]|[2,2,b]|
//|[2,2,b]|[3,2,b]|
//|[3,2,b]|[1,2,a]|
//|[3,2,b]|[2,2,b]|
//|[3,2,b]|[3,2,b]|
//+-------+-------+

Here is how this behaves in Spark dataset.

val t = f.dataset
//t: org.apache.spark.sql.Dataset[Foo] = [i: int, b: bigint ... 1 more field]

t.join(t, t("c") === t("c")).show()
//16/11/25 10:35:17 WARN Column: Constructing trivially true equals predicate, 'c#5 = c#5'. Perhaps you need to use aliases.
//+---+---+---+---+---+---+
//|  i|  b|  c|  i|  b|  c|
//+---+---+---+---+---+---+
//|  1|  2|  a|  1|  2|  a|
//|  2|  2|  b|  3|  2|  b|
//|  2|  2|  b|  2|  2|  b|
//|  3|  2|  b|  3|  2|  b|
//|  3|  2|  b|  2|  2|  b|
//+---+---+---+---+---+---+

Add UDF support for frameless.TypedColumn

This is an important and popular feature in Spark (Dataframe/Datasets) api. It will be great if we can have a typesafe version of it.

Here is a very first scratchy version (just to stir up the discussion):

import org.apache.spark.sql.functions.udf

def fudf[T,U: TypeTag, G: TypeTag](f: U => G)
    (implicit 
       uenc: TypedEncoder[U], 
       ueng: TypedEncoder[G]): (TypedColumn[T, U] => TypedColumn[T, G]) =
      (u: TypedColumn[T, U]) => {
        val uf = udf(f)
        val x: Column = uf(u.untyped)
        new TypedColumn[T,G](x)
      }

Assuming the syntax from #50 is available:

scala> val e = TypedDataset.create[(Int, String, Long)]( (1,"a",2L) :: (2, "b", 4L) :: (2, "b", 1L) :: Nil )

scala> val testUdf = fudf[(Int,String,Long),Int,Int]( (_:Int) + 1 )
o: frameless.TypedColumn[(Int, String, Long),Int] => frameless.TypedColumn[(Int, String, Long),Int] = <function1>

scala> e.select( testUdf(e('_1)) )
res4: frameless.TypedDataset[Int] = [UDF(_1): int]

scala> e.select( testUdf(e('_1)) ).collect().run()
res6: Seq[Int] = WrappedArray(2, 3, 3)

Bug: Applying aggregation functions on select() to some fields and simple projection to other causes runtime errors

case class X(i: Int, j: Int)
val f = TypedDataset.create(X(1,1) :: X(1,2) :: X(1,3) :: Nil)

f.select( sum(f('i)), f('j) )  // we apply sum on i and simple project on j

org.apache.spark.sql.AnalysisException: grouping expressions sequence is empty, and '`j`' is not an aggregate function. Wrap '(coalesce(CAST(sum(CAST(`i` AS BIGINT)) AS BIGINT), CAST(0 AS BIGINT)) AS `coalesce(sum(i), 0)`)' in windowing function(s) or wrap '`j`' in first() (or first_value) if you don't care which value you get.;;
Aggregate [coalesce(cast(sum(cast(i#2 as bigint)) as bigint), cast(0 as bigint)) AS coalesce(sum(i), 0)#44L, j#3]
+- LocalRelation [i#2, j#3]

  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:222)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$7.apply(CheckAnalysis.scala:257)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$7.apply(CheckAnalysis.scala:257)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:257)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2603)
  at org.apache.spark.sql.Dataset.select(Dataset.scala:969)
  at frameless.TypedDataset$selectMany$.applyProduct(TypedDataset.scala:490)
  at frameless.TypedDataset.select(TypedDataset.scala:363)
  ... 42 elided

shapeless lens

Hi there.

case class Address(street : String, city : String)
case class Person(name : String, age : Int, address : Address)

val people = Seq(Person("Mary", 32, Address("Southover Street", "Brighton")))
val ds = TypedDataset.create(people)

How i can get TypedColumn for 'address.street'?
Can i use Lens from shapless or any other way?

Bug: Filtering on Option fields does not work for None.

The example shows the problem. The expected behavior is when we select the rows that have a column as None, it should essentially translate into filtering the rows that have the field as null.

case class Foo(i: Option[Boolean])
//defined class Foo

val f = TypedDataset.create( Foo(Some(true)) :: Foo(Some(false)) :: Foo(None) :: Nil )
//f: frameless.TypedDataset[Foo] = [i: boolean]

f.show().run
+-----+
|    i|
+-----+
| true|
|false|
| null|
+-----+
//f: Unit = ()

f.filter( f('i) === Some(true) ).show().run
+----+
|   i|
+----+
|true|
+----+

f.filter( f('i) === Some(false) ).show().run
+-----+
|    i|
+-----+
|false|
+-----+

f.filter( f('i) === None ).show().run
+---+
|  i|
+---+
+---+

udfs: detect closure problems at compile time

yesterday I had to deal with an issue involving udfs.
spark barfed out a non-serializable exception because I had defined my udf like a closure. I referred to an Int variable that was defined in the block containing the definition of the udf.
this compiled fine, but exploded at runtime.
the solution was to pass that Int to the udf in the select statement as a lit.
would it be possible for frameless to make sure that these things don't compile?

Bug: Runtime error when selecting a composite field (a field that is also a struct())

I am getting this error when I try to select a field that is also a case class (Catalyst struct). Here is a simple example that shows this:

scala> case class Foo(i: Int, j: Boolean)
defined class Foo

scala> case class Bar(i: Foo, x: Int)
defined class Bar

scala> val t = TypedDataset.create( Bar(Foo(1,true),2) :: Nil )
t: frameless.TypedDataset[Bar] = [i: struct<i: int, j: boolean>, x: int]

scala> t.select(t('i))
org.apache.spark.sql.AnalysisException: Try to map struct<_1:struct<i:int,j:boolean>> to Tuple2, but failed as the number of fields does not line up.;
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveDeserializer$$fail(Analyzer.scala:1921)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveDeserializer$$validateTopLevelTupleFields(Analyzer.scala:1938)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$32$$anonfun$applyOrElse$12.applyOrElse(Analyzer.scala:1912)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$32$$anonfun$applyOrElse$12.applyOrElse(Analyzer.scala:1904)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:278)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionDown$1(QueryPlan.scala:157)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:167)
  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$4.apply(QueryPlan.scala:176)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:176)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:145)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$32.applyOrElse(Analyzer.scala:1904)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$32.applyOrElse(Analyzer.scala:1900)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$.apply(Analyzer.scala:1900)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$.apply(Analyzer.scala:1899)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
  at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
  at scala.collection.immutable.List.foldLeft(List.scala:84)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolveAndBind(ExpressionEncoder.scala:244)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:210)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:167)
  at org.apache.spark.sql.Dataset$.apply(Dataset.scala:59)
  at org.apache.spark.sql.Dataset.as(Dataset.scala:359)
  at frameless.TypedDataset.as(TypedDataset.scala:29)
  at frameless.TypedDataset.select(TypedDataset.scala:284)
  ... 42 elided

Bug: Problem with custom encoders for Vector[Custom]

@kanterov @OlivierBlanvillain Found an issue encoding (say) Vector[Food] in our tests.

Here is a minimal test to reproduce the error

test("test me ....") {
    def prop[
    A: TypedEncoder,
    B: TypedEncoder,
    C: TypedEncoder,
    D: TypedEncoder](data: Vector[X4[A, B, C, D]]): Prop = {
      val ds = TypedDataset.create(data)
      ds.show().run()
      true
    }
    check(forAll(prop[X2[Int, Int], Int, Boolean, Vector[Food]] _))
  }

Running this gives the following runtime error:

RuntimeException was thrown during property evaluation.
  Message: Error while encoding: java.lang.ClassCastException: frameless.Pasta$ cannot be cast to java.lang.Integer
named_struct(a, input[0, frameless.X4, false].a.a, b, input[0, frameless.X4, false].a.b) AS a#20
+- named_struct(a, input[0, frameless.X4, false].a.a, b, input[0, frameless.X4, false].a.b)
   :- a
   :- input[0, frameless.X4, false].a.a
   :  +- input[0, frameless.X4, false].a
   :     +- input[0, frameless.X4, false]
   :- b
   +- input[0, frameless.X4, false].a.b
      +- input[0, frameless.X4, false].a
         +- input[0, frameless.X4, false]

input[0, frameless.X4, false].b AS b#21
+- input[0, frameless.X4, false].b
   +- input[0, frameless.X4, false]

input[0, frameless.X4, false].c AS c#22
+- input[0, frameless.X4, false].c
   +- input[0, frameless.X4, false]

newInstance(class org.apache.spark.sql.catalyst.util.GenericArrayData) AS d#23
+- newInstance(class org.apache.spark.sql.catalyst.util.GenericArrayData)
   +- input[0, frameless.X4, false].d
      +- input[0, frameless.X4, false]

  Occurred when passed generated values (
    arg0 = Vector(X4(X2(-1,-1694336440),1160431481,true,Vector(Pasta)))
  )
ScalaTestFailureLocation: frameless.CreateTests$$anonfun$2 at (CreateTests.scala:38)
org.scalatest.exceptions.GeneratorDrivenPropertyCheckFailedException: RuntimeException was thrown during property evaluation.
  Message: Error while encoding: java.lang.ClassCastException: frameless.Pasta$ cannot be cast to java.lang.Integer
named_struct(a, input[0, frameless.X4, false].a.a, b, input[0, frameless.X4, false].a.b) AS a#20
+- named_struct(a, input[0, frameless.X4, false].a.a, b, input[0, frameless.X4, false].a.b)
   :- a
   :- input[0, frameless.X4, false].a.a
   :  +- input[0, frameless.X4, false].a
   :     +- input[0, frameless.X4, false]
   :- b
   +- input[0, frameless.X4, false].a.b
      +- input[0, frameless.X4, false].a
         +- input[0, frameless.X4, false]

input[0, frameless.X4, false].b AS b#21
+- input[0, frameless.X4, false].b
   +- input[0, frameless.X4, false]

input[0, frameless.X4, false].c AS c#22
+- input[0, frameless.X4, false].c
   +- input[0, frameless.X4, false]

newInstance(class org.apache.spark.sql.catalyst.util.GenericArrayData) AS d#23
+- newInstance(class org.apache.spark.sql.catalyst.util.GenericArrayData)
   +- input[0, frameless.X4, false].d
      +- input[0, frameless.X4, false]

  Occurred when passed generated values (
    arg0 = Vector(X4(X2(-1,-1694336440),1160431481,true,Vector(Pasta)))
  )
	at org.scalatest.prop.Checkers$.doCheck(Checkers.scala:428)
	at org.scalatest.prop.Checkers$class.check(Checkers.scala:346)
	at frameless.TypedDatasetSuite.check(TypedDatasetSuite.scala:8)
	at org.scalatest.prop.Checkers$class.check(Checkers.scala:357)
	at frameless.TypedDatasetSuite.check(TypedDatasetSuite.scala:8)
	at frameless.CreateTests$$anonfun$2.apply$mcV$sp(CreateTests.scala:38)
	at frameless.CreateTests$$anonfun$2.apply(CreateTests.scala:22)
	at frameless.CreateTests$$anonfun$2.apply(CreateTests.scala:22)
	at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
	at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
	at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555)
	at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
	at org.scalatest.FunSuite.runTest(FunSuite.scala:1555)
	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
	at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
	at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
	at org.scalatest.Suite$class.run(Suite.scala:1424)
	at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
	at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
	at org.scalatest.FunSuite.run(FunSuite.scala:1555)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
	at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563)
	at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
	at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044)
	at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043)
	at org.scalatest.tools.Runner$.run(Runner.scala:883)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.lang.RuntimeException: Error while encoding: java.lang.ClassCastException: frameless.Pasta$ cannot be cast to java.lang.Integer
named_struct(a, input[0, frameless.X4, false].a.a, b, input[0, frameless.X4, false].a.b) AS a#20
+- named_struct(a, input[0, frameless.X4, false].a.a, b, input[0, frameless.X4, false].a.b)
   :- a
   :- input[0, frameless.X4, false].a.a
   :  +- input[0, frameless.X4, false].a
   :     +- input[0, frameless.X4, false]
   :- b
   +- input[0, frameless.X4, false].a.b
      +- input[0, frameless.X4, false].a
         +- input[0, frameless.X4, false]

input[0, frameless.X4, false].b AS b#21
+- input[0, frameless.X4, false].b
   +- input[0, frameless.X4, false]

input[0, frameless.X4, false].c AS c#22
+- input[0, frameless.X4, false].c
   +- input[0, frameless.X4, false]

newInstance(class org.apache.spark.sql.catalyst.util.GenericArrayData) AS d#23
+- newInstance(class org.apache.spark.sql.catalyst.util.GenericArrayData)
   +- input[0, frameless.X4, false].d
      +- input[0, frameless.X4, false]

	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:279)
	at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:421)
	at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:421)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.spark.sql.SparkSession.createDataset(SparkSession.scala:421)
	at org.apache.spark.sql.SQLContext.createDataset(SQLContext.scala:380)
	at frameless.TypedDataset$.create(TypedDataset.scala:471)
	at frameless.CreateTests$$anonfun$2.frameless$CreateTests$$anonfun$$prop$1(CreateTests.scala:28)
	at frameless.CreateTests$$anonfun$2$$anonfun$apply$mcV$sp$1.apply(CreateTests.scala:38)
	at frameless.CreateTests$$anonfun$2$$anonfun$apply$mcV$sp$1.apply(CreateTests.scala:38)
	at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:52)
	at org.scalacheck.Prop$$anonfun$forAllShrink$1$$anonfun$3.apply(Prop.scala:714)
	at org.scalacheck.Prop$$anonfun$forAllShrink$1$$anonfun$3.apply(Prop.scala:714)
	at org.scalacheck.Prop$.secure(Prop.scala:458)
	at org.scalacheck.Prop$$anonfun$forAllShrink$1.org$scalacheck$Prop$$anonfun$$result$1(Prop.scala:714)
	at org.scalacheck.Prop$$anonfun$forAllShrink$1.apply(Prop.scala:751)
	at org.scalacheck.Prop$$anonfun$forAllShrink$1.apply(Prop.scala:708)
	at org.scalacheck.Prop$$anonfun$apply$5.apply(Prop.scala:293)
	at org.scalacheck.Prop$$anonfun$apply$5.apply(Prop.scala:292)
	at org.scalacheck.PropFromFun.apply(Prop.scala:21)
	at org.scalacheck.Test$.org$scalacheck$Test$$workerFun$1(Test.scala:323)
	at org.scalacheck.Test$$anonfun$2.apply(Test.scala:352)
	at org.scalacheck.Test$$anonfun$2.apply(Test.scala:352)
	at org.scalacheck.Platform$.runWorkers(Platform.scala:40)
	at org.scalacheck.Test$.check(Test.scala:352)
	at org.scalatest.prop.Checkers$.doCheck(Checkers.scala:372)
	... 56 more
Caused by: java.lang.ClassCastException: frameless.Pasta$ cannot be cast to java.lang.Integer
	at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
	at org.apache.spark.sql.catalyst.util.GenericArrayData.getInt(GenericArrayData.scala:62)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_1$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:276)
	... 88 more

'csum' and 'csumByKey' should use a CommutativeMonoid

According to Spark docs, reduce, reduceByKey, fold and foldByKey operations in RDDs should pass in a binary commutative and associative operation. This is an excerpt from Spark 2.1.0 code:

/**
   * Reduces the elements of this RDD using the specified commutative and
   * associative binary operator.
   */
def reduce(f: (T, T) => T): T = ...

So constraining the type to a Monoid is not enough as this only garantees associativity but not commutativity. These methods should be constraining in a cats.kernel.CommutativeMonoid in order to be safer.

Is also arguably whether they also need a Monoid at all as they do not make use of the empty operation and potentially a CommutativeSemigroup could suffice...

Null value in any field, make the entire row return null

In Frameless:

TypedDataset.create((1,"a")::(1,null.asInstanceOf[String])::Nil).collect().run
res: Seq[(Int, String)] = WrappedArray((1,a), null)

Here is the behavior in vanilla Spark:

spark.createDataset((1,"a")::(1,null.asInstanceOf[String])::Nil).collect()
res: Array[(Int, String)] = Array((1,a), (1,null))

Here are some differences in the serializers they use:

scala> TypedExpressionEncoder[(Int,Long)].serializer
res19: Seq[org.apache.spark.sql.catalyst.expressions.Expression] = List(input[0, scala.Tuple2, false]._1 AS _1#28, input[0, scala.Tuple2, false]._2 AS _2#29L)

scala> org.apache.spark.sql.Encoders.product[(Int,Long)].asInstanceOf[ExpressionEncoder[(Int, Long)]].serializer
res20: Seq[org.apache.spark.sql.catalyst.expressions.Expression] = List(assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._1 AS _1#30, assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._2 AS _2#31L)

TypedEncoder for Seq

I know that it is not generally advised to provide typeclass instance for type up in the hierarchy like Seq. However, it would be very convenient to have them defined somewhere (e.g. extra import) for people who are migrating existing spark project to frameless.

Currently, spark supports Encoders for Seq of primitive, String and Product.

java.lang.AbstractMethodError: org.apache.spark.sql.catalyst.expressions.Expression.doGenCode

Hi there.

In my project i'm using versions:

val sparkV = "2.0.1"
val scalaV = "2.11.8"
val framelessV = "0.1.0"

When i tried do this :

sealed trait Gender
case object Male extends Gender
case object Female extends Gender
case object Other extends Gender

implicit val genderToInt:frameless.Injection[Gender,Int] = frameless.Injection[Gender,Int](
{
case Male => 1
case Female => 2
case Other => 3
},
{
case 1 => Male
case 2 => Female
case 3 => Other
}
)

case class Person(age: Int, gender: Gender)
val people = Seq(Person(1,Male))
implicit val sqlContext = spark.sqlContext
TypedDataset.create(people)

then found this error:

java.lang.AbstractMethodError: org.apache.spark.sql.catalyst.expressions.Expression.doGenCode(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodegenContext;Lorg/apache/spark/sql/catalyst/expressions/codegen/ExprCode;)Lorg/apache/spark/sql/catalyst/expressions/codegen/ExprCode;
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:101)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:101)
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:744)
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:744)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:744)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:299)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:363)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:356)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:32)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:825)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.extractProjection$lzycompute(ExpressionEncoder.scala:252)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.extractProjection(ExpressionEncoder.scala:252)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:276)
at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:421)
at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:421)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.SparkSession.createDataset(SparkSession.scala:421)
at org.apache.spark.sql.SQLContext.createDataset(SQLContext.scala:380)
at frameless.TypedDataset$.create(TypedDataset.scala:269)
... 52 elided

Can you help me?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.