Giter Site home page Giter Site logo

dataframecheatsheet's Introduction

Spark DataFrame Cheat Sheet

Yuhao's cheat sheet for Apache Spark DataFrame. Welcome to contribute.

Core Concepts

DataFrame is simply a type alias of Dataset[Row]

Quick Reference

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .master("local")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

Creation

  • create DataSet from seq

      // vertically
      spark.createDataset(Seq(1, 2))
    
      // horizontally
      val rows = spark.sparkContext.parallelize(Seq(Row.fromSeq(Seq(1, 2))))
      val schema = StructType(Seq("col1", "col2").map(col => StructField(col, IntegerType, nullable = false)))
      spark.createDataFrame(rows, schema).show()
    
  • create DataSet from range

      spark.createDataset(1 to 10)
    
  • create DataSet from array of tuples

      spark.createDataset(Array((1, "Tom"), (2, "Jerry"))).toDF("id", "name")
    
      val newNames = Seq("id", "x1", "x2", "x3")
      val dfRenamed = df.toDF(newNames: _*)
    
  • Seq to Dataset

      List("a").toDS()
      Seq(1, 3, 5).toDS()
    
      import spark.implicits._
      Seq.empty[(String, Int)].toDF("k", "v")
    
  • create Dataset from Seq of case class

    // define case class Person(name: String, age: Long) outside of the method. reason

      val caseClassDS = Seq(Person("Andy", 32)).toDS()
      val caseClassDS = spark.createDataset(Seq(Person("Andy", 32), Person("Andy2", 33)))
    
  • create Dataset from RDD

      import spark.implicits._
      val rdd = sc.parallelize(1 to 5)
      spark.createDataset(rdd)
    
      import spark.implicits._
      val rdd = sc.parallelize(1 to 5)
      rdd.toDS().show()
      rdd.toDF().show()
    
      val df = rdd.map({ 
      case Row(val1: String, ..., valN: Long) => (val1, ..., valN)}).toDF("col1_name", ..., "colN_name")
    

    // define case class Person(name: String, age: Long) outside of the method. reason

      val peopleDF = spark.sparkContext
       .textFile("examples/src/main/resources/people.txt")
       .map(_.split(","))
       .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
       .toDF()
    
  • create DataFrame from RDD with schema

      val rows = freqItemsets.map(f => Row(f.items, f.freq))
      val schema = StructType(Seq(
        StructField("items", dataset.schema($(featuresCol)).dataType, nullable = false),
        StructField("freq", LongType, nullable = false)))
      val frequentItems = dataset.sparkSession.createDataFrame(rows, schema)
    
      val schema = StructType( StructField("k", StringType, true) ::
          StructField("v", IntegerType, false) :: Nil)
      spark.createDataFrame(sc.emptyRDD[Row], schema).show()
    
  • create DataSet from File

  spark.read.json("examples/src/main/resources/people.json")
  // from json
  val path = "examples/src/main/resources/people.json"`
  val peopleDS = spark.read.json(path).as[Person]
  // from text file
  import spark.implicits._
  val dataset = spark.read.textFile("data/mllib/sample_fpgrowth.txt")
    .map(t => t.split(" ")).toDF("features")
  // read from csv
  val df = session.read
    .format("csv")
    .option("header", "true") //reading the headers
    .option("mode", "DROPMALFORMED")
    .csv("csv/file/path")

Select

  • select with col function

      import org.apache.spark.sql.functions._
      dataset.select(col($(labelCol)), col($(featuresCol))).rdd.map {
        case Row(label: Double, features: Vector) =>
          LabeledPoint(label, features)
      }
    
      // avg average
      dataset.select(avg(inputCol)).as[Double].first()
    
      // median (or other percentage)
      filtered.stat.approxQuantile(inputCol, Array(0.5), 0.001)
    
      // check df empty
      df.rdd.isEmpty
    
      // select array of columns
      df.select(cols.head, cols.tail: _*)
      
      df.select(cols.map(col): _*)
    
  • select with type

      output.select("features").as[Vector].collect()
    
  • select with basic calculation

      import spark.implicits._
      df.select($"name", $"age" + 1).show()
    
  • select from temp view

      df.createOrReplaceTempView("people")
      val sqlDF = spark.sql("SELECT * FROM people")
    
      // Global temporary view is tied to a system preserved database `global_temp
      spark.sql("SELECT * FROM global_temp.people").show()
    
      // Global temporary view is cross-session
      spark.newSession().sql("SELECT * FROM global_temp.people").show()
    
  • select with sql

      val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
    
  • Filter

      df.filter($"age" > 21).show()
    
      val ic = col(inputCol)
      val filtered = dataset.select(ic.cast(DoubleType))
        .filter(ic.isNotNull && ic =!= $(missingValue) && !ic.isNaN)
    
    df.filter($"state" === "TX") 
    df.filter("state = 'TX'")
    df.filter($"foo".contains("bar"))
    df.filter(not($"state" === "TX"))
    df.filter($"foo".like("bar"))
    
  • sort

      import org.apache.spark.sql.functions._
    
      df.orderBy(asc("col1"))
    
      df.sort(desc("col2"))
    
  • Rename column

        df.select($"id".alias("x1")).show()
    
      val lookup = Map("id" -> "foo", "value" -> "bar")
      df.select(df.columns.map(c => col(c).as(lookup.getOrElse(c, c))): _*)
    
      df.withColumnRenamed("_1", "x1")
    
  • change column type (cast)

        val df2 = df.select($"id", col("value").cast(StringType))
    
    df.selectExpr("cast(year as int) year", 
                          "make")
    
  • GroupBy

      df.groupBy("age").count().show()
    
      val dfMax = df.groupBy($"id").agg(sum($"value"))
    
      df.as[Record]
        .groupByKey(_.id)
        .reduceGroups((x, y) => x).show()
    
  • Window

      import org.apache.spark.sql.functions.{rowNumber, max, broadcast}
      import org.apache.spark.sql.expressions.Window
    
      val temp = Window.partitionBy($"hour").orderBy($"TotalValue".desc)
    
      val top = df.withColumn("rn", rowNumber.over(temp)).where($"rn" === 1)
    
  • join

      df.join(broadcast(dfMax), "col1").show()
    
      Leaddetails.join(
          Utm_Master, 
          Leaddetails("LeadSource") <=> Utm_Master("LeadSource")
              && Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source")
              && Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium")
              && Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"),
          "left"
      )
    
  • concat

      df.createOrReplaceTempView("df")
      spark.sql("SELECT CONCAT(id, ' ',  value) as cc FROM df").show()
    
      df.select(concat($"id", lit(" "), $"value"))
    
  • with generic

      private def genericFit[T: ClassTag](dataset: Dataset[_]): FPGrowthModel = {
          val data = dataset.select($(featuresCol))
          val items = data.where(col($(featuresCol)).isNotNull).rdd.map(r => r.getSeq[T](0).toArray)
          ...
        }
    
  • when

       val ic = col(inputCol)
       outputDF = outputDF.withColumn(outputCol,
         when(ic.isNull, surrogate)
         .when(ic === $(missingValue), surrogate)
         .otherwise(ic)
         .cast(inputType))
    
      val coder: (Int => String) = (arg: Int) => {if (arg < 100) "little" else "big"}
      val sqlfunc = udf(coder)
      myDF.withColumn("Code", sqlfunc(col("Amt")))
    
      // (1, -1) label to (1, 0) label
      df.select($"id", when($"label" === 1, 1).otherwise(0).as("label")).show()
    
      // drop NaN and null
      df.na.drop().show()
    
  • cube

      ds.cube($"department", $"gender").agg(Map(
          "salary" -> "avg",
          "age" -> "max"
        ))
    
  • statistics

      df.stat.freqItems(Seq("id")).show()
      df.stat.approxQuantile(...)
      df.stat.bloomFilter(...)
      df.stat.countMinSketch()
    
      // count distinct
      df.select(approx_count_distinct(col("value"))).show()
    

Append

  • append constant

      import org.apache.spark.sql.functions._
      df.withColumn("new_column", lit(10)).show()
    
      df.withColumn("map", map(lit("key1"), lit(1), lit("key2"), lit(2)))
    
      df.select('*', (df.age + 10).alias('newAge'))
    

UDF

  • select DataFrame with UDF

      protected def raw2prediction(rawPrediction: Vector): Double = rawPrediction.argmax
      ...
      
      udf(raw2prediction _).apply(col(getRawPredictionCol))
    
      val predictUDF = udf { (features: Any) =>
        bcastModel.value.predict(features.asInstanceOf[Vector])
      }
      dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol))))
    
      // concat two columns with udf
      //Define a udf to concatenate two passed in string values
      val getConcatenated = udf( (first: String, second: String) => { first + " " + second } )
    
      //use withColumn method to add a new column called newColName
      df.withColumn("newColName", getConcatenated($"col1", $"col2")).select("newColName", "col1", "col2").show()
    

Schema

  • print schema

      df.printSchema()
    
      dataset.schema($(labelCol))
    
      df.explain()
    
      // spark internal
      SchemaUtils.checkColumnTypes(schema, inputCol, Seq(DoubleType, FloatType))
    
      MetadataUtils.getNumClasses(dataset.schema($(labelCol)))
    
  • repartition

      df.repartition($"value")
      df.explain()
    
      df.repartition(2)
    
  • custom class

      import spark.implicits._
      class MyObj(val i: Int)
      implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
      // ...
      val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
    
      class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])
    
      // alias for the type to convert to and from
      type MyObjEncoded = (Int, String, Set[String])
    
      // implicit conversions
      implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
      implicit def fromEncoded(e: MyObjEncoded): MyObj =
          new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
    
      val d = spark.createDataset(Seq[MyObjEncoded](
        new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
        new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
      )).toDF("i","u","s").as[MyObjEncoded]
    

Read and write

  • parquet

      df.write.parquet(dataPath)
      ...
      val data = sparkSession.read.format("parquet").load(dataPath)
      val Row(coefficients: Vector, intercept: Double) =
          data.select("coefficients", "intercept").head()
    
  • checkpoint

      df.checkpoint()
    
  • save by key

        df.write.partitionBy("id").text("people")
    

dataframecheatsheet's People

Contributors

hhbyyh avatar

Watchers

James Cloos avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.