Giter Site home page Giter Site logo

Comments (16)

thesuperzapper avatar thesuperzapper commented on August 23, 2024 1

@westonsankey can you raise an issue here: https://github.com/epam/parso

from spark-sas7bdat.

thesuperzapper avatar thesuperzapper commented on August 23, 2024

Some questions:

  • Can you please provide logs?
  • Dose this also happen when writing to other formats? (e.g. parquet)

from spark-sas7bdat.

Tagar avatar Tagar commented on August 23, 2024

On 1st one -

add overwrite option like

df.write.format('csv').mode('overwrite').save("s3://bucket/output")

from spark-sas7bdat.

westonsankey avatar westonsankey commented on August 23, 2024

On 1st one -

add overwrite option like

df.write.format('csv').mode('overwrite').save("s3://bucket/output")

Forgot to include that in my post, but I am using that option already.

from spark-sas7bdat.

thesuperzapper avatar thesuperzapper commented on August 23, 2024

@westonsankey logs?

from spark-sas7bdat.

westonsankey avatar westonsankey commented on August 23, 2024
Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2029)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2028)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
        ... 33 more
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File already exists:s3://BUCKET/LOCATION/part-00001-xyz.csv
        at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.RegularUploadPlanner.checkExistenceIfNotOverwriting(RegularUploadPlanner.java:36)
        at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.RegularUploadPlanner.plan(RegularUploadPlanner.java:30)
        at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.UploadPlannerChain.plan(UploadPlannerChain.java:37)
        at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.create(S3NativeFileSystem.java:601)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:932)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:913)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:810)
        at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.create(EmrFileSystem.java:212)
        at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
        at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
        at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CSVFileFormat.scala:177)
        at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:85)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:236)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more

from spark-sas7bdat.

westonsankey avatar westonsankey commented on August 23, 2024

Some questions:

  • Can you please provide logs?
  • Dose this also happen when writing to other formats? (e.g. parquet)

Added logs. It happens when writing to other formats as well (tested with Parquet).

from spark-sas7bdat.

thesuperzapper avatar thesuperzapper commented on August 23, 2024

@westonsankey can you provide the following:

  1. Information about your environment (Spark Version, EMR, etc.)
  2. The stack trace when you get the "There are no available bytes in the input stream." message. (And how you got it to happen)
  3. Dose the "File already exists" error always happen for all files, every time you run it?
  4. Can you read/write smaller SAS files in your Spark environment (smaller, but big enough to cause a split)? [You can force a split with maxSplitSize option]
  5. For any of the files having issues, can SAS read and process all the rows? (E.g. are you confident it's not corrupt)
  6. Can you provide/create a non-confidential SAS file which also displays this issue? [Don't worry too much about this one yet]

from spark-sas7bdat.

westonsankey avatar westonsankey commented on August 23, 2024

@thesuperzapper - I think the files might be corrupt, but I'm doing some more testing/research on my end. I'll provide you with that information if I determine that this is not the case. Appreciate the help thus far.

from spark-sas7bdat.

thesuperzapper avatar thesuperzapper commented on August 23, 2024

@westonsankey even if the files are corrupt, perhaps we can emit a nicer error message for such files.

from spark-sas7bdat.

westonsankey avatar westonsankey commented on August 23, 2024

@thesuperzapper

  1. EMR version 5.24.1 and Spark version 2.4.2. Cluster contains a master and one worker, both m3.xlarge (15 GB memory). Tested on a larger cluster as well with the same results.
  2. Stack trace below.
  3. It happens for certain files that are larger than 20 GB when I run it manually in the PySpark shell. When running the same code as an EMR step, I get the FileAlreadyExists exception.
  4. We have no issues with any files under 20 GB.
Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2029)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2028)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
        ... 33 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at com.github.saurfang.sas.util.PrivateMethodCaller.apply(PrivateMethodExposer.scala:30)
        at com.github.saurfang.sas.parso.SasFileParserWrapper.readNext(ParsoWrapper.scala:81)
        at com.github.saurfang.sas.mapreduce.SasRecordReader.readNext$lzycompute$1(SasRecordReader.scala:159)
        at com.github.saurfang.sas.mapreduce.SasRecordReader.readNext$1(SasRecordReader.scala:153)
        at com.github.saurfang.sas.mapreduce.SasRecordReader.nextKeyValue(SasRecordReader.scala:175)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$ileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:244)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 10 more
Caused by: java.io.IOException: There are no available bytes in the input stream.
        at com.epam.parso.impl.SasFileParser.getBytesFromFile(SasFileParser.java:748)
        at com.epam.parso.impl.SasFileParser.readSubheaderSignature(SasFileParser.java:405)
        at com.epam.parso.impl.SasFileParser.processPageMetadata(SasFileParser.java:374)
        at com.epam.parso.impl.SasFileParser.processNextPage(SasFileParser.java:572)
        at com.epam.parso.impl.SasFileParser.readNextPage(SasFileParser.java:543)
        at com.epam.parso.impl.SasFileParser.readNext(SasFileParser.java:501)
        ... 30 more

from spark-sas7bdat.

thesuperzapper avatar thesuperzapper commented on August 23, 2024

@westonsankey
I wonder if its to do with Scala 2.12 (which is used in Spark 2.4.2 only), as we only compile for 2.11 right now. Can you try with Spark 2.4.3 or Spark 2.4.1?

This might become an issue, because we use a crazy hack to use the Parso library, and I doubt it works in Scala 2.12.

from spark-sas7bdat.

westonsankey avatar westonsankey commented on August 23, 2024

@thesuperzapper
Tried running on a couple older EMR/Spark versions but had no success.

from spark-sas7bdat.

thesuperzapper avatar thesuperzapper commented on August 23, 2024

@westonsankey

  1. Are you able to run it in local mode?
  2. Can you provide the worker stack trace, rather than just the driver one? (If not running in local mode)
  3. Just to confirm, are you using Scala or PySpark?
  4. I know it will be difficult, but could you create a 20gb file with a similar method to how you make your normal one, but with random data in it. (Unless the dataset can be public) [You don't have to do this right away, but it will be quite hard to debug without some kind of file]

from spark-sas7bdat.

thesuperzapper avatar thesuperzapper commented on August 23, 2024

@westonsankey could you also try setting maxSplitSize to larger than the file, and doing it with one server, I know it will take awhile, but this will let us confirm if it’s an issue with splitting. (I wonder if it something to do with page sizes larger than 1 MB)

from spark-sas7bdat.

westonsankey avatar westonsankey commented on August 23, 2024

@thesuperzapper - Tried setting the maxSplitSize to force a single partition, but that resulted in the same error.

I wrote a small Java program using the Parso library to iterate over the rows in the SAS file to see if there were any errors parsing a single row. Turns out that is the issue - I get the IOException indicating that there are no available bytes in the input stream on a single row. Given that, it seems like this is not an issue with spark-sas7bdat.

from spark-sas7bdat.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.