yotpoltd / metorikku Goto Github PK
View Code? Open in Web Editor NEWA simplified, lightweight ETL Framework based on Apache Spark
Home Page: https://yotpoltd.github.io/metorikku/
License: MIT License
A simplified, lightweight ETL Framework based on Apache Spark
Home Page: https://yotpoltd.github.io/metorikku/
License: MIT License
any plans to add cassandra input support?
Rather than CSV, would be nice to to have a fixed width (non delimited) input file support. I guess an accompanying spec would need to be in the input yaml to say column name and char start-end position ie {columnA:"1-20",columnB:"21-25",columnC:"26-50"}
It would be awsome to support more types as inputs.
Part of this would be moving the dateRange to be an input type.
here some examples for the inputs section in the configuration file that comes to mind:
# Sources can be used as types for inputs
sources:
name: my_mongo
type: mongo
credentials: user1/pass1
name: big_mysql
type: mysql
db_name: movies_db
username: theusername
password: thepassword
# Parquet or JSON Tables files paths
inputs:
input_1:
type: file
path: parquet/input_1.parquet
input_2:
type: source
name: my_mongo
collection: movies
input_3:
type: source
name: big_mysql
query: "SELECT * FROM movies LIMIT 100"
input_4:
type: source
name: big_mysql
table: movies
input_5:
type: file_date_range
template: parquet/%s/input_1.parquet
dateRange:
format: yyyy/MM/dd
startDate: 2017/09/01
endDate: 2017/09/20
input_6:
type: file_range
template: parquet/%s/input_1.parquet
array:
- development
- staging
- production
oracle db password is in plaintext, how to hide?
would be great to be able to specify something like a curl request to a simple GET http endpoint (and later POST params) and the results feeding into a metorikku input
When using the write function in metorikku a cache and count are performed.
These actions should not be mandatory but triggered.
any plans to add hivecontext input support? this way if there are already external hive tables pointing at s3 data then can skip adding a dataframe input for each of the s3 locations/hive tables of which there could be 1000s
Add ability to output data to a JDBC destination.
Currently only redhshift is supported.
This is a very simple fix that can help with some use cases.
Just switch loading the files to use org.apache.hadoop.fs classes.
Make readme file more consistent and more robust
It's now turned on by default for all metrics, we should make it optional and turn it off by default.
any plans to add hbase input support?
In my yaml I have this:
output:
file:
dir: s3a://bucket/myvar=$SETME/
how to run java with passing argument to replace value in the yaml? ie java -DSETME=ABC -cp "metorikku-standalone.jar" com.yotpo.metorikku.Metorikku -c tests.yaml
or
java -DSETME=XYZ -cp "metorikku-standalone.jar" com.yotpo.metorikku.Metorikku -c tests.yaml
I dont want to change the yaml each time with different value, i want the java arguments to overridepart of the yaml value.
We need to disable outputting in MetorikkuTester.
This will remove errors such as:
2018-01-29 22:28:36,460 ERROR [main] parquet.ParquetOutputWriter (ParquetOutputWriter.scala:write(31)) - Parquet file configuration were not provided
From the README, we can use:
INSERT INTO table_name (column1, column2, column3, ...) VALUES ($1, $2, $3, ...);
The column ordering does not work and instead we need to use
INSERT INTO table_name (column1, column2, column3, ...) VALUES (?, ?, ?, ...);
We currently adding quotes by default in variables registry,
we need to remove this.
I ran the test runner and got an exception because of a redundant comma ',' in the test_settings.json - instead of getting something like "Invalid JSON syntax at row 12 test_settings.json"
spark-submit --class com.yotpo.metorikku.MetorikkuTester metorikku-standalone.jar -t test_settings.json
Exception in thread "main"
Exception: java.lang.InternalError thrown from the UncaughtExceptionHandler in thread "main"
Add ability to use kafka as a source and Metorikku as a streaming job.
Add more examples and use cases
also fix example tests and write about tests in the readme
Currently in Metorikku tester compares nested arrays using string comparison.
We need to use deep equals.
Support usage of custom UDFs, for example sanitizer for column, geopIp or user agent extractor
Add contributing guide to project
Write the tests for the standalone JAR + JAR with spark submit and also for different versions of spark
Metrikku Test runner:
There is no indication which tests have passed and which ones failed and for what reason - at the end of a run
Given one overriden option it will remove all other CSV options default
any plans to add mongo input support?
I am trying to run the following spark-submit
/home/ec2-user/spark_home/bin/spark-submit --class com.yotpo.metorikku.Metorikku --deploy-mode cluster --driver-memory 4g --executor-memory 50g --executor-cores 8 --master <master_host>:6066 metorikku.jar -c s3a:///slice/metorikku_test.yaml
it failed with the following error message.
Error: Supplied YAML file not found
Try --help for more information.
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:65)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: com.yotpo.metorikku.exceptions.MetorikkuException: Failed to parse config file
at com.yotpo.metorikku.configuration.ConfigurationParser$.parse(ConfigurationParser.scala:20)
at com.yotpo.metorikku.Metorikku$.delayedEndpoint$com$yotpo$metorikku$Metorikku$1(Metorikku.scala:15)
at com.yotpo.metorikku.Metorikku$delayedInit$body.apply(Metorikku.scala:12)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.yotpo.metorikku.Metorikku$.main(Metorikku.scala:12)
at com.yotpo.metorikku.Metorikku.main(Metorikku.scala)
... 6 more
when metorikku_test.yaml config file is placed locally it works absolutely fine. How to make config file (and metrics file be placed at a centralised location such as S3 so available at runtime?)
I am able to write the output in CSV format when running locally but doing the same on spark cluster produces an error (Note - It works perfectly fine when the file output type is Parquet)
/spark-submit --class com.yotpo.metorikku.Metorikku --deploy-mode cluster --driver-memory 4g --executor-memory 50g --executor-cores 8 --conf "spark.eventLog.dir=" --conf spark.eventLog.enabled=true --master spark://Host:Port /metorikku.jar -c
Error Message seen is
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:65)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: com.yotpo.metorikku.exceptions.MetorikkuWriteFailedException: Failed to write dataFrame: mlau to output: CSV on metric: mlpoc_metric
at com.yotpo.metorikku.metric.MetricSet.writeBatch(MetricSet.scala:81)
at com.yotpo.metorikku.metric.MetricSet$$anonfun$write$1.apply(MetricSet.scala:97)
at com.yotpo.metorikku.metric.MetricSet$$anonfun$write$1.apply(MetricSet.scala:88)
at scala.collection.immutable.List.foreach(List.scala:381)
at com.yotpo.metorikku.metric.MetricSet.write(MetricSet.scala:88)
at com.yotpo.metorikku.metric.MetricSet$$anonfun$run$1.apply(MetricSet.scala:50)
at com.yotpo.metorikku.metric.MetricSet$$anonfun$run$1.apply(MetricSet.scala:44)
at scala.collection.immutable.List.foreach(List.scala:381)
at com.yotpo.metorikku.metric.MetricSet.run(MetricSet.scala:44)
at com.yotpo.metorikku.Metorikku$$anonfun$runMetrics$1.apply(Metorikku.scala:23)
at com.yotpo.metorikku.Metorikku$$anonfun$runMetrics$1.apply(Metorikku.scala:21)
at scala.collection.immutable.List.foreach(List.scala:381)
at com.yotpo.metorikku.Metorikku$.runMetrics(Metorikku.scala:21)
at com.yotpo.metorikku.Metorikku$.delayedEndpoint$com$yotpo$metorikku$Metorikku$1(Metorikku.scala:18)
at com.yotpo.metorikku.Metorikku$delayedInit$body.apply(Metorikku.scala:12)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.yotpo.metorikku.Metorikku$.main(Metorikku.scala:12)
at com.yotpo.metorikku.Metorikku.main(Metorikku.scala)
... 6 more
Caused by: java.lang.IllegalArgumentException: Illegal pattern component: XXX
at org.apache.commons.lang3.time.FastDateFormat.parsePattern(FastDateFormat.java:577)
at org.apache.commons.lang3.time.FastDateFormat.init(FastDateFormat.java:444)
at org.apache.commons.lang3.time.FastDateFormat.(FastDateFormat.java:437)
at org.apache.commons.lang3.time.FastDateFormat$1.createInstance(FastDateFormat.java:110)
at org.apache.commons.lang3.time.FastDateFormat$1.createInstance(FastDateFormat.java:109)
at org.apache.commons.lang3.time.FormatCache.getInstance(FormatCache.java:82)
at org.apache.commons.lang3.time.FastDateFormat.getInstance(FastDateFormat.java:205)
at org.apache.spark.sql.execution.datasources.csv.CSVOptions.(CSVOptions.scala:136)
at org.apache.spark.sql.execution.datasources.csv.CSVOptions.(CSVOptions.scala:39)
at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.prepareWrite(CSVFileFormat.scala:67)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:140)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:644)
at com.yotpo.metorikku.output.writers.csv.CSVOutputWriter.write(CSVOutputWriter.scala:33)
at com.yotpo.metorikku.metric.MetricSet.writeBatch(MetricSet.scala:77)
... 29 more
Rather than loading CSV file inputs would be nice to also load pipe (|) delimited, tab delimited, ascii x1F (or specific ascii code) delimited files
Add scala style to project and style guide to project
Use spark.read.format(inputType).options().load(dataPath) instead of spark.read.parquet/json/csv.options().load(dataPath) will allow code reuse and support new formats out of the blue like avro
Support writing to the same output type with multiple names
can running of various input dataframes be done in parallel? similar with metrics, or outputs?
Move logo to upper part of readme.
remove logo from project and use it from github pages
ERROR ParquetOutputWriter: Parquet file configuration were not provided
ERROR CassandraOutputWriter: Cassandra configurations were not provided
I am trying to load in a csv input and have put following json schema but i can't see anyway for metorikku to interpret the timestamp format (ie yyyy-mm-dd hh: ..etc) of the csv column in order to convert to spark timestamp
{
"$schema": "smallTestSchema",
"id": "smallTestSchema",
"type": "object",
"name": "/",
"properties": {
"row_no": { "id": "smallTestSchema/row_no/", "type": "string", "name": "row_no" }
,"start_date": { "id": "smallTestSchema/start_date/", "type": "string", "name": "start_date" }
,"end_date": { "id": "smallTestSchema/end_date/", "type": "string", "name": "end_date" }
,"unit_name": { "id": "smallTestSchema/unit_name/", "type": "timestamp", "name": "unit_name" }
}
Add a landing page in github pages for the project, explaining the purpose of metorikku
Ability to write output in ORC file format
currently Metorikku is using a simple YAML config file as input.
we need to be able to override this configuration using CLI params
/home/ec2-user/spark_home/jars/jdk1.8.0_131/bin/java -Dspark.master=spark://cluster:7077 -cp "/home/ec2-user/spark_home/jars/metorikku-standalone.jar:/home/ec2-user/hadoop285/*" com.yotpo.metorikku.Metorikku -c /home/ec2-user/metorikku/examples/vault.yaml
spark version installed on the cluster is 2.3.2
ls -l /home/ec2-user/hadoop285/*
./home/ec2-user/hadoop285/aws-java-sdk-core-1.10.6.jar
./home/ec2-user/hadoop285/aws-java-sdk-kms-1.10.6.jar
./home/ec2-user/hadoop285/aws-java-sdk-s3-1.10.6.jar
./home/ec2-user/hadoop285/guava-14.0.1.jar
./home/ec2-user/hadoop285/hadoop-annotations-2.8.5.jar
./home/ec2-user/hadoop285/hadoop-auth-2.8.5.jar
./home/ec2-user/hadoop285/hadoop-aws-2.8.5.jar
./home/ec2-user/hadoop285/hadoop-client-2.8.5.jar
./home/ec2-user/hadoop285/hadoop-common-2.8.5.jar
./home/ec2-user/hadoop285/hadoop-hdfs-2.8.5.jar
./home/ec2-user/hadoop285/hadoop-hdfs-client-2.8.5.jar
./home/ec2-user/hadoop285/hadoop-mapreduce-client-app-2.8.5.jar
./home/ec2-user/hadoop285/hadoop-mapreduce-client-common-2.8.5.jar
./home/ec2-user/hadoop285/hadoop-mapreduce-client-core-2.8.5.jar
./home/ec2-user/hadoop285/hadoop-mapreduce-client-jobclient-2.8.5.jar
./home/ec2-user/hadoop285/hadoop-mapreduce-client-shuffle-2.8.5.jar
./home/ec2-user/hadoop285/hadoop-yarn-api-2.8.5.jar
./home/ec2-user/hadoop285/hadoop-yarn-client-2.8.5.jar
./home/ec2-user/hadoop285/hadoop-yarn-common-2.8.5.jar
./home/ec2-user/hadoop285/hadoop-yarn-server-common-2.8.5.jar
./home/ec2-user/hadoop285/hadoop-yarn-server-web-proxy-2.8.5.jar
./home/ec2-user/hadoop285/htrace-core4-4.0.1-incubating.jar
./home/ec2-user/hadoop285/httpclient-4.5.1.jar
./home/ec2-user/hadoop285/httpcore-4.4.3.jar
./home/ec2-user/hadoop285/commons-lang3-3.5.jar
some of the log output:
18/12/11 04:45:52 INFO CSVOutputWriter: Writing CSV Dataframe to s3a://a/b/c/rv_hub_party.csv
18/12/11 04:45:53 WARN TaskSetManager: Lost task 0.0 in stage 8.0 (TID 148, x.x.209.244, executor 10): java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat; local class incompatible: stream classdesc serialVersionUID = 2, local class serialVersionUID = 1
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1884)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1750)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2041)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/12/11 04:45:53 ERROR TaskSetManager: Task 0 in stage 8.0 failed 4 times; aborting job
18/12/11 04:45:53 ERROR FileFormatWriter: Aborting job null.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 4 times, most recent failure: Lost task 0.3 in stage 8.0 (TID 151, x.x.209.244, executor 10): java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat; local class incompatible: stream classdesc serialVersionUID = 2, local class serialVersionUID = 1
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1884)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1750)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2041)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:644)
at com.yotpo.metorikku.output.writers.csv.CSVOutputWriter.write(CSVOutputWriter.scala:33)
at com.yotpo.metorikku.metric.MetricSet.writeBatch(MetricSet.scala:77)
at com.yotpo.metorikku.metric.MetricSet$$anonfun$write$1.apply(MetricSet.scala:97)
at com.yotpo.metorikku.metric.MetricSet$$anonfun$write$1.apply(MetricSet.scala:88)
at scala.collection.immutable.List.foreach(List.scala:392)
at com.yotpo.metorikku.metric.MetricSet.write(MetricSet.scala:88)
at com.yotpo.metorikku.metric.MetricSet$$anonfun$run$1.apply(MetricSet.scala:50)
at com.yotpo.metorikku.metric.MetricSet$$anonfun$run$1.apply(MetricSet.scala:44)
at scala.collection.immutable.List.foreach(List.scala:392)
at com.yotpo.metorikku.metric.MetricSet.run(MetricSet.scala:44)
at com.yotpo.metorikku.Metorikku$$anonfun$runMetrics$1.apply(Metorikku.scala:23)
at com.yotpo.metorikku.Metorikku$$anonfun$runMetrics$1.apply(Metorikku.scala:21)
at scala.collection.immutable.List.foreach(List.scala:392)
at com.yotpo.metorikku.Metorikku$.runMetrics(Metorikku.scala:21)
at com.yotpo.metorikku.Metorikku$.delayedEndpoint$com$yotpo$metorikku$Metorikku$1(Metorikku.scala:18)
at com.yotpo.metorikku.Metorikku$delayedInit$body.apply(Metorikku.scala:12)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.yotpo.metorikku.Metorikku$.main(Metorikku.scala:12)
at com.yotpo.metorikku.Metorikku.main(Metorikku.scala)
Caused by: java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat; local class incompatible: stream classdesc serialVersionUID = 2, local class serialVersionUID = 1
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1884)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1750)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2041)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "main" com.yotpo.metorikku.exceptions.MetorikkuWriteFailedException: Failed to write dataFrame: rv_hub_party to output: CSV on metric: vault_metric
at com.yotpo.metorikku.metric.MetricSet.writeBatch(MetricSet.scala:81)
at com.yotpo.metorikku.metric.MetricSet$$anonfun$write$1.apply(MetricSet.scala:97)
at com.yotpo.metorikku.metric.MetricSet$$anonfun$write$1.apply(MetricSet.scala:88)
at scala.collection.immutable.List.foreach(List.scala:392)
at com.yotpo.metorikku.metric.MetricSet.write(MetricSet.scala:88)
at com.yotpo.metorikku.metric.MetricSet$$anonfun$run$1.apply(MetricSet.scala:50)
at com.yotpo.metorikku.metric.MetricSet$$anonfun$run$1.apply(MetricSet.scala:44)
at scala.collection.immutable.List.foreach(List.scala:392)
at com.yotpo.metorikku.metric.MetricSet.run(MetricSet.scala:44)
at com.yotpo.metorikku.Metorikku$$anonfun$runMetrics$1.apply(Metorikku.scala:23)
at com.yotpo.metorikku.Metorikku$$anonfun$runMetrics$1.apply(Metorikku.scala:21)
at scala.collection.immutable.List.foreach(List.scala:392)
at com.yotpo.metorikku.Metorikku$.runMetrics(Metorikku.scala:21)
at com.yotpo.metorikku.Metorikku$.delayedEndpoint$com$yotpo$metorikku$Metorikku$1(Metorikku.scala:18)
at com.yotpo.metorikku.Metorikku$delayedInit$body.apply(Metorikku.scala:12)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.yotpo.metorikku.Metorikku$.main(Metorikku.scala:12)
at com.yotpo.metorikku.Metorikku.main(Metorikku.scala)
Caused by: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:644)
at com.yotpo.metorikku.output.writers.csv.CSVOutputWriter.write(CSVOutputWriter.scala:33)
at com.yotpo.metorikku.metric.MetricSet.writeBatch(MetricSet.scala:77)
... 23 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 4 times, most recent failure: Lost task 0.3 in stage 8.0 (TID 151, x.x.209.244, executor 10): java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat; local class incompatible: stream classdesc serialVersionUID = 2, local class serialVersionUID = 1
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1884)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1750)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2041)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)
... 45 more
Caused by: java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat; local class incompatible: stream classdesc serialVersionUID = 2, local class serialVersionUID = 1
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1884)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1750)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2041)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
add the ability to write to multiple outputs while using kafka input, currently supports only parquet and other kafka topics and just one of them.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.