library(dplyr)
library(sparklyr)
library(nycflights13)
# local version
flights %>%
group_by(carrier) %>%
summarize(count_num = n(),
mean_dep_delay = mean(dep_delay, na.rm = TRUE),
ratio = mean_dep_delay / count_num) %>%
arrange(carrier)
#> # A tibble: 16 x 4
#> carrier count_num mean_dep_delay ratio
#> <chr> <int> <dbl> <dbl>
#> 1 9E 18460 16.7 0.000906
#> 2 AA 32729 8.59 0.000262
#> 3 AS 714 5.80 0.00813
#> 4 B6 54635 13.0 0.000238
#> 5 DL 48110 9.26 0.000193
#> 6 EV 54173 20.0 0.000368
#> 7 F9 685 20.2 0.0295
#> 8 FL 3260 18.7 0.00574
#> 9 HA 342 4.90 0.0143
#> 10 MQ 26397 10.6 0.000400
#> 11 OO 32 12.6 0.393
#> 12 UA 58665 12.1 0.000206
#> 13 US 20536 3.78 0.000184
#> 14 VX 5162 12.9 0.00249
#> 15 WN 12275 17.7 0.00144
#> 16 YV 601 19.0 0.0316
# Spark version
sc <- spark_connect(master = "local")
flights_sdf <- copy_to(sc, flights, "flights")
flights_sdf %>%
group_by(carrier) %>%
summarize(count_num = n(),
mean_dep_delay = mean(dep_delay),
ratio = mean_dep_delay / count_num) %>%
collect()
#> Warning: Missing values are always removed in SQL.
#> Use `AVG(x, na.rm = TRUE)` to silence this warning
#> Error: org.apache.spark.sql.AnalysisException: cannot resolve '`mean_dep_delay`' given input columns: [dest, dep_delay, distance, dep_time, minute, carrier, origin, sched_arr_time, month, arr_time, day, flight, sched_dep_time, time_hour, arr_delay, air_time, hour, tailnum, year]; line 1 pos 81;
#> 'Aggregate [carrier#38], [carrier#38, count(1) AS count_num#447L, avg(dep_delay#34) AS mean_dep_delay#448, ('mean_dep_delay / 'count_num) AS ratio#449]
#> +- SubqueryAlias flights
#> +- LogicalRDD [year#29, month#30, day#31, dep_time#32, sched_dep_time#33, dep_delay#34, arr_time#35, sched_arr_time#36, arr_delay#37, carrier#38, flight#39, tailnum#40, origin#41, dest#42, air_time#43, distance#44, hour#45, minute#46, time_hour#47]
#>
#> at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
#> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$anonfun$checkAnalysis$1$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:88)
#> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$anonfun$checkAnalysis$1$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
#> at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$transformUp$1.apply(TreeNode.scala:289)
#> at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$transformUp$1.apply(TreeNode.scala:289)
#> at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
#> at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
#> at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$3.apply(TreeNode.scala:286)
#> at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$3.apply(TreeNode.scala:286)
#> at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$4.apply(TreeNode.scala:306)
#> at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
#> at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
#> at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
#> at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$3.apply(TreeNode.scala:286)
#> at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$3.apply(TreeNode.scala:286)
#> at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$4.apply(TreeNode.scala:306)
#> at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
#> at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
#> at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
#> at org.apache.spark.sql.catalyst.plans.QueryPlan$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
#> at org.apache.spark.sql.catalyst.plans.QueryPlan$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
#> at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:279)
#> at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$recursiveTransform$1(QueryPlan.scala:289)
#> at org.apache.spark.sql.catalyst.plans.QueryPlan$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$recursiveTransform$1$1.apply(QueryPlan.scala:293)
#> at scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:234)
#> at scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:234)
#> at scala.collection.immutable.List.foreach(List.scala:381)
#> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
#> at scala.collection.immutable.List.map(List.scala:285)
#> at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$recursiveTransform$1(QueryPlan.scala:293)
#> at org.apache.spark.sql.catalyst.plans.QueryPlan$anonfun$6.apply(QueryPlan.scala:298)
#> at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
#> at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298)
#> at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
#> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
#> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:78)
#> at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
#> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
#> at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
#> at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
#> at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:66)
#> at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:623)
#> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
#> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
#> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
#> at java.lang.reflect.Method.invoke(Method.java:498)
#> at sparklyr.Invoke$.invoke(invoke.scala:102)
#> at sparklyr.StreamHandler$.handleMethodCall(stream.scala:97)
#> at sparklyr.StreamHandler$.read(stream.scala:62)
#> at sparklyr.BackendHandler.channelRead0(handler.scala:52)
#> at sparklyr.BackendHandler.channelRead0(handler.scala:14)
#> at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
#> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
#> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
#> at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
#> at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
#> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
#> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
#> at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
#> at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
#> at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
#> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
#> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
#> at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
#> at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
#> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
#> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
#> at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
#> at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
#> at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
#> at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
#> at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
#> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
#> at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
#> at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
#> at java.lang.Thread.run(Thread.java:748)
# Spark workaround
flights_sdf %>%
group_by(carrier) %>%
mutate(count_num = n(),
mean_dep_delay = mean(dep_delay),
ratio = mean_dep_delay / count_num) %>%
summarize(count_num = mean(count_num),
mean_dep_delay = mean(mean_dep_delay),
ratio = mean(ratio)) %>%
arrange(carrier) %>%
collect()
#> Warning: Missing values are always removed in SQL.
#> Use `AVG(x, na.rm = TRUE)` to silence this warning
#> Warning: Missing values are always removed in SQL.
#> Use `AVG(x, na.rm = TRUE)` to silence this warning
#> Warning: Missing values are always removed in SQL.
#> Use `AVG(x, na.rm = TRUE)` to silence this warning
#> Warning: Missing values are always removed in SQL.
#> Use `avg(x, na.rm = TRUE)` to silence this warning
#> # A tibble: 16 x 4
#> carrier count_num mean_dep_delay ratio
#> <chr> <dbl> <dbl> <dbl>
#> 1 9E 18460 16.7 0.000906
#> 2 AA 32729 8.59 0.000262
#> 3 AS 714 5.80 0.00813
#> 4 B6 54635 13.0 0.000238
#> 5 DL 48110 9.26 0.000193
#> 6 EV 54173 20.0 0.000368
#> 7 F9 685 20.2 0.0295
#> 8 FL 3260 18.7 0.00574
#> 9 HA 342 4.90 0.0143
#> 10 MQ 26397 10.6 0.000400
#> 11 OO 32.0 12.6 0.393
#> 12 UA 58665 12.1 0.000206
#> 13 US 20536 3.78 0.000184
#> 14 VX 5162 12.9 0.00249
#> 15 WN 12275 17.7 0.00144
#> 16 YV 601 19.0 0.0316
> devtools::session_info()
Session info ---------------------------------------------------------------------
setting value
version R version 3.4.1 (2017-06-30)
system x86_64, linux-gnu
ui RStudio (1.1.383)
language (EN)
collate en_US.UTF-8
tz <NA>
date 2018-01-08
Packages -------------------------------------------------------------------------
package * version date source
assertthat 0.2.0 2017-04-11 CRAN (R 3.4.1)
backports 1.1.2 2017-12-13 cran (@1.1.2)
base * 3.4.1 2017-09-07 local
base64enc 0.1-3 2015-07-28 CRAN (R 3.4.1)
bindr 0.1 2016-11-13 CRAN (R 3.4.1)
bindrcpp * 0.2 2017-06-17 CRAN (R 3.4.1)
broom 0.4.3 2017-11-20 cran (@0.4.3)
callr 1.0.0 2016-06-18 CRAN (R 3.4.1)
cli 1.0.0 2017-11-05 CRAN (R 3.4.1)
clipr 0.4.0 2017-11-03 CRAN (R 3.4.1)
compiler 3.4.1 2017-09-07 local
config 0.2 2016-08-02 CRAN (R 3.4.1)
crayon 1.3.4 2017-09-16 CRAN (R 3.4.1)
datasets * 3.4.1 2017-09-07 local
DBI 0.7 2017-06-18 CRAN (R 3.4.1)
dbplyr 1.2.0 2018-01-03 cran (@1.2.0)
devtools 1.13.4 2017-11-09 CRAN (R 3.4.1)
digest 0.6.13 2017-12-14 cran (@0.6.13)
dplyr * 0.7.4 2017-09-28 CRAN (R 3.4.1)
evaluate 0.10.1 2017-06-24 CRAN (R 3.4.1)
foreign 0.8-69 2017-06-22 CRAN (R 3.4.1)
glue 1.2.0 2017-10-29 CRAN (R 3.4.1)
graphics * 3.4.1 2017-09-07 local
grDevices * 3.4.1 2017-09-07 local
grid 3.4.1 2017-09-07 local
htmltools 0.3.6 2017-04-28 CRAN (R 3.4.1)
httpuv 1.3.5 2017-07-04 CRAN (R 3.4.1)
httr 1.3.1 2017-08-20 CRAN (R 3.4.1)
jsonlite 1.5 2017-06-01 CRAN (R 3.4.1)
knitr 1.17 2017-08-10 CRAN (R 3.4.1)
lattice 0.20-35 2017-03-25 CRAN (R 3.4.1)
lazyeval 0.2.1 2017-10-29 CRAN (R 3.4.1)
magrittr 1.5 2014-11-22 CRAN (R 3.4.1)
memoise 1.1.0 2017-04-21 CRAN (R 3.4.1)
methods * 3.4.1 2017-09-07 local
mime 0.5 2016-07-07 CRAN (R 3.4.1)
mnormt 1.5-5 2016-10-15 CRAN (R 3.4.1)
nlme 3.1-131 2017-02-06 CRAN (R 3.4.1)
nycflights13 * 0.2.2 2017-01-27 CRAN (R 3.4.1)
openssl 0.9.9 2017-11-10 cran (@0.9.9)
parallel 3.4.1 2017-09-07 local
pillar 1.0.1 2017-11-27 cran (@1.0.1)
pkgconfig 2.0.1 2017-03-21 CRAN (R 3.4.1)
plyr 1.8.4 2016-06-08 CRAN (R 3.4.1)
psych 1.7.8 2017-09-09 CRAN (R 3.4.1)
purrr 0.2.4 2017-10-18 CRAN (R 3.4.1)
R6 2.2.2 2017-06-17 CRAN (R 3.4.1)
Rcpp 0.12.14 2017-11-23 cran (@0.12.14)
reprex * 0.1.1 2017-01-13 CRAN (R 3.4.1)
reshape2 1.4.3 2017-12-11 cran (@1.4.3)
rlang 0.1.6 2017-12-21 cran (@0.1.6)
rmarkdown 1.7 2017-11-10 CRAN (R 3.4.1)
rprojroot 1.3-2 2018-01-03 cran (@1.3-2)
rstudioapi 0.7 2017-09-07 CRAN (R 3.4.1)
shiny 1.0.5 2017-08-23 CRAN (R 3.4.1)
sparklyr * 0.7.0-9106 2018-01-08 Github (rstudio/sparklyr@41d145a)
stats * 3.4.1 2017-09-07 local
stringi 1.1.5 2017-04-07 CRAN (R 3.4.1)
stringr 1.2.0 2017-02-18 CRAN (R 3.4.1)
tibble 1.4.1 2017-12-25 cran (@1.4.1)
tidyr 0.7.2 2017-10-16 CRAN (R 3.4.1)
tools 3.4.1 2017-09-07 local
utf8 1.1.3 2018-01-03 cran (@1.1.3)
utils * 3.4.1 2017-09-07 local
whisker 0.3-2 2013-04-28 CRAN (R 3.4.1)
withr 2.1.1 2017-12-19 cran (@2.1.1)
xtable 1.8-2 2016-02-05 CRAN (R 3.4.1)
yaml 2.1.14 2016-11-12 CRAN (R 3.4.1)