Giter Site home page Giter Site logo

astrolabsoftware / spark-fits Goto Github PK

View Code? Open in Web Editor NEW
19.0 7.0 7.0 9.19 MB

FITS data source for Spark SQL and DataFrames

Home Page: https://astrolabsoftware.github.io/spark-fits/

License: Apache License 2.0

Scala 79.61% Shell 8.14% Python 11.09% Java 1.16%
scala spark-sql fits fitsio apache-spark pyspark hdfs

spark-fits's Introduction

FITS Data Source for Apache Spark

Build Status codecov Maven Central Arxiv

Latest news

  • [01/2018] Launch: project starts!
  • [03/2018] Release: version 0.3.0
  • [04/2018] Paper: Arxiv
  • [05/2018] Release: version 0.4.0
  • [06/2018] New location: spark-fits is an official project of AstroLab!
  • [07/2018] Release: version 0.5.0, 0.6.0
  • [10/2018] Release: version 0.7.0, 0.7.1
  • [12/2018] Release: version 0.7.2
  • [03/2019] Release: version 0.7.3
  • [05/2019] Release: version 0.8.0, 0.8.1, 0.8.2
  • [06/2019] Release: version 0.8.3
  • [05/2020] Release: version 0.8.4
  • [07/2020] Release: version 0.9.0
  • [04/2021] Release: version 1.0.0

spark-fits

This library provides two different tools to manipulate FITS data with Apache Spark:

  • A Spark connector for FITS file.
  • A Scala library to manipulate FITS file.

The user interface has been done to be the same as other built-in Spark data sources (CSV, JSON, Avro, Parquet, etc). Note that spark-fits follows Apache Spark Data Source V1 (plan to migrate to V2). See our website for more information. To include spark-fits in your job:

# Scala 2.11
spark-submit --packages "com.github.astrolabsoftware:spark-fits_2.11:1.0.0" <...>

# Scala 2.12
spark-submit --packages "com.github.astrolabsoftware:spark-fits_2.12:1.0.0" <...>

or you can link against this library in your program at the following coordinates in your build.sbt

// Scala 2.11
libraryDependencies += "com.github.astrolabsoftware" % "spark-fits_2.11" % "1.0.0"

// Scala 2.12
libraryDependencies += "com.github.astrolabsoftware" % "spark-fits_2.12" % "1.0.0"

Currently available:

  • Read fits file and organize the HDU data into DataFrames.
  • Automatically distribute bintable rows over machines.
  • Automatically distribute image rows over machines.
  • Automatically infer DataFrame schema from the HDU header.

Header Challenge!

The header tested so far are very simple, and not so exotic. Over the time, we plan to add many new features based on complex examples (see here). If you use spark-fits, and encounter errors while reading a header, tell us (issues or PR) so that we fix the problem asap!

TODO list

  • Define custom Hadoop InputFile.
  • Migrate to Spark DataSource V2

Support

spark-fits's People

Contributors

chrisarnault avatar dependabot[bot] avatar hrivnac avatar julienpeloton avatar only-solutions avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

spark-fits's Issues

some columns are missing in dataframe when loaded with spark-fits (although present when loaded with topcat/astropy/fitsio)

Hi @JulienPeloton,

Another weird one...it looks as though some columns are missing when loading this file with spark-fits. In this case the g_psfflux* columns are nowhere to be found.

Although these columns can be seen in Topcat:

Screenshot 2019-06-06 at 01 23 50

Screenshot 2019-06-06 at 01 23 56

and also with astropy / fitsio:

Screenshot 2019-06-06 at 01 25 22

But when I attempt to find the columns in the spark-dataframe they are not there.

Screenshot 2019-06-06 at 01 38 25

and therefore df.select('g_psfflux_mag').show() fails (see screenshot and exception)

Screenshot 2019-06-06 at 01 34 14

Exception from the above screenshot:

{AnalysisException}"cannot resolve 'g_psfflux_mag' given input columns: [i_cmodel_exp_flag, i_pixelflags_rejectedcenter, r_undeblended_convolvedflux_3_kron_mag, y_psfflux_magsigma, y_convolvedflux_1_kron_magsigma_isnull, z_kronflux_mag_isnull, y_convolvedflux_0_kron_mag, z_undeblended_psfflux_magsigma, i_convolvedflux_2_kron_mag_isnull, i_localbackground_flag_badcentroid_isnull, r_cmodel_dev_flag_isnull, r_undeblended_convolvedflux_0_kron_mag_isnull, g_pixelflags_saturatedcenter, z_pixelflags_edge_isnull, r_pixelflags_interpolatedcenter, z_pixelflags, y_undeblended_kronflux_mag_isnull, y_cmodel_initial_flag, y_pixelflags_rejectedcenter, r_undeblended_kronflux_mag_isnull, r_kronflux_magsigma_isnull, i_pixelflags_edge_isnull, z_undeblended_convolvedflux_0_kron_mag_isnull, y_pixelflags_inexact_psf, g_inputcount_flag, i_localbackground_flag_isnull, g_cmodel_mag_inner, r_pixelflags_saturated_isnull, i_localbackground_flag_nogoodpixels, y_cmodel_mag_inner, i_cmodel_initial_mag_inner_isnull...

This is very unusual because the other columns of the same format r_psfflux*, i_psfflux*,z_psfflux* and y_psfflux* are parsed just fine:

Screenshot 2019-06-06 at 01 32 58

Screenshot 2019-06-06 at 01 33 08

Screenshot 2019-06-06 at 01 33 17

Screenshot 2019-06-06 at 01 33 32

They also .show() with no problem:

Screenshot 2019-06-06 at 01 43 22

Please let me know if you have any idea of what could be causing these columns to go missing.

Thanks again for all your help,
Jacob

DataFrame reader cannot load files as list of paths or comma separated string of paths

I am aware the hadoop file system implemented in spark-fits uses globs to load multiple files, however it does not seem to work with a list of paths or a comma separated string of paths.

Would it be possible to add the ability to load fits files in this way?
It seems fairly standard among spark dataframe readers so would be super useful :)

(see https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=load#pyspark.sql.DataFrameReader.load) .

This works

 files = 'wmap.cluster.*.projected.fits'
 df = sqlc.read.format("fits").option("hdu", 1).load(files)

This does not work:

 # For testing purposes limit number of files (massive speed up).
 files = glob.glob('wmap.cluster.*.projected.fits')[:3]
 # files = <class 'list'>: ['/u/jacobic/magneticum/src/../data/interim/wmap.cluster.453315a6-bd26-a929-2963-b33df1bfccfb.projected.fits', '/u/jacobic/magneticum/src/../data/interim/wmap.cluster.7dff7815-5881-b2cf-8f19-b1a3fe08e949.projected.fits', '/u/jacobic/magneticum/src/../data/interim/wmap.cluster.d77d7156-41cd-89f5-4d07-a60ef45a1934.projected.fits']

 df = sqlc.read.format("fits").option("hdu", 1).load(files)

This also does not work:

 # For testing purposes limit number of files (massive speed up).
 files = ','.join(glob.glob('wmap.cluster.*.projected.fits')[:3])
 # files = /u/jacobic/magneticum/src/../data/interim/wmap.cluster.453315a6-bd26-a929-2963-b33df1bfccfb.projected.fits,/u/jacobic/magneticum/src/../data/interim/wmap.cluster.7dff7815-5881-b2cf-8f19-b1a3fe08e949.projected.fits,/u/jacobic/magneticum/src/../data/interim/wmap.cluster.d77d7156-41cd-89f5-4d07-a60ef45a1934.projected.fits

 df = sqlc.read.format("fits").option("hdu", 1).load(files)

Cheers,
Jacob

Bad mapping between scalars (SPARK) and 1element vector (FITS)

Describe the issue
When reading a FITS file containing 1element vector this is mapped onto a 1element vector but it should be a scalar instead.

Expected behaviour
It should be a scalar instead

Actual behaviour
Wrong mapping (see above)

Additional context
This issue was absent in version 0.7.3 and present in version 0.8.0

header challenge: cannot infer size of type B from the header

Here is some feedback about an error reading unsigned bytes in fits files.

Keep up the good work! I love this spark package :)

The following error is thrown when calling:

df = sqlc.read.format("fits").option("hdu", 1).load(path)

"FitsLib.getSplitLocation> Cannot infer size of type B
            from the header! See com.astrolabsoftware.sparkfits.FitsLib.getSplitLocation"

An example of the header is below (the FLAG_* columns are the ones causing the problem: example.txt

It looks like the issue is due to not having a case for shortType.contains("B"):

def getSplitLocation(fitstype : String) : Int = {
      val shortType = FitsLib.shortStringValue(fitstype)

      shortType match {
        case x if shortType.contains("I") => 2
        case x if shortType.contains("J") => 4
        case x if shortType.contains("K") => 8
        case x if shortType.contains("E") => 4
        case x if shortType.contains("D") => 8
        case x if shortType.contains("L") => 1
        case x if shortType.endsWith("X") => {
          // Example 16X means 2 bytes
          x.slice(0, x.length - 1).toInt / BYTE_SIZE
        }
        case x if shortType.endsWith("A") => {
          // Example 20A means string on 20 bytes
          x.slice(0, x.length - 1).toInt
        }
        case _ => {
          println(s"""
            FitsLib.getSplitLocation> Cannot infer size of type $shortType
            from the header! See com.astrolabsoftware.sparkfits.FitsLib.getSplitLocation
              """)
          0
        }

Thanks again,
Jacob

Add support for Scala 2.12

Future Apache Spark release will deprecate the use of Scala 2.11 and have Scala 2.12 support.
This has already started with Spark 2.4.2, which includes experimental Scala 2.12 support.

.option('columns', 'col1,col2,col3,col4') does not preserve order

Hi Julien,

Great work on the latest release! here is some more minor feedback.

Since I am now loading a very large number of small files I thought it would be best for me to only load the relevant columns that I need and specify the schema explicitly in order to minimise the overhead (is this what you would recommend?).

I now use the .option('column', 'comma_seperated_column_names') method with spark-fits but noticed that the order of the specified columns is not preserved (see inline variable values and resulting df.show() after loading in the screenshot below).

Screenshot 2019-05-13 at 23 59 19

The columns seem to be grouped by type rather than alphabetically or by the order of the list given. This would usually not be a problem, but since I also use the column option with .schema(UserSchema...) this can cause unexpected behaviour because I set the order of the fields in the UserSchema to be identical to that of the order in the 'comma_seperated_column_names' in the column option.

The main reason why I want to use these options is to optmise the speed of reading in many files. Please let me know if this methodology is beneficial or if I am heading in the wrong direction. For a single file the header file must be read to filter the columns anyway and I assume the schema is simultaneously inferred so there might not be any speed benefit to specifying it? although I am not sure if this is still the case if you read in many files and specify the schema manually?

Cheers,
Jacob

Benchmarking on s3

I'm trying to benchmark spark-fits on s3, by internally looping over the same piece of code:

path = "s3a://abucket/..."
fn = "afile.fits" # 700 MB

for index in range(N):
  df = spark.read\
    .format("fits")\
    .option("hdu", 1)\
    .load(os.path.join(path, fn))

  start = time.time()
  df.count()
  elapsed = time.time() - start
  print("{} seconds".format(elapsed))

With the default s3 configuration, it hangs after the first iteration, and I get a timeout error. I found that increasing the parameter that controls the maximum number of simultaneous connections to S3 (fs.s3a.connection.maximum) from 15 to 100 fixes somehow the problem. It is not clear exactly why and how, so it would be good to investigate further.

Strategy for low level direct access to sparse FITS data

=> We consider the BinTable use case ie. N columns x M rows

We want to only access [a subset of columns] AND/OR [a subset of rows]

ie. Is it possible to specify:

"select (col1, col2, ..., colp) from file"
"select (row1, row2, ..., rowq) from file"
"select (col1, col2, ..., colp) and (row1, row2, ..., rowq) from file"

so that only relevant data bytes are really read ?

  • This must be a very low level mechanism
  • Ensure data locality => assume some shuffle
  • How to evaluate the threshold of data proportion where the mechanism would become unefficient.
  • Is it feasible in cfitsio ?

fits files that match glob expressions which might contain empty HDU tables (NAXIS=0) causes fatal error

Hi @JulienPeloton

I have some additional feedback about spark-fits, perhaps this is an issue too specific to my particular use case but thought it was worth mentioning anyway in case there is a possibility to improve stability :)

I am loading many files at the same time using a glob expression with spark-fits. For 99.9% of these files, my spark pipeline runs smoothly. For some rare cases the whole pipeline is brought to a halt because of some files which match the glob expression but have NAXIS= 0 in their header i.e. an empty table. These files have an almost identical format as all the other files that I want to load into my master dataframe but when the data is actually loaded from the file (along with all the other good files with NAXIS=2 ) the following error occurs:

  File "/draco/u/jacobic/hyperpipes/src/pipelines/transformers.py", line 65, in _transform
    df.show()
  File "/mpcdf/soft/SLE_12_SP3/packages/x86_64/spark/2.4.0/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 378, in show
  File "/mpcdf/soft/SLE_12_SP3/packages/x86_64/spark/2.4.0/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/mpcdf/soft/SLE_12_SP3/packages/x86_64/spark/2.4.0/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/mpcdf/soft/SLE_12_SP3/packages/x86_64/spark/2.4.0/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o132.showString.
: java.lang.ArrayIndexOutOfBoundsException: 0
	at com.astrolabsoftware.sparkfits.FitsHduImage$ImageHDU.getSizeRowBytes(FitsHduImage.scala:77)
	at com.astrolabsoftware.sparkfits.FitsLib$Fits.<init>(FitsLib.scala:194)
	at com.astrolabsoftware.sparkfits.FitsRelation$$anonfun$checkSchemaAndReturnType$1.apply(FitsSourceRelation.scala:228)
	at com.astrolabsoftware.sparkfits.FitsRelation$$anonfun$checkSchemaAndReturnType$1.apply(FitsSourceRelation.scala:226)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at com.astrolabsoftware.sparkfits.FitsRelation.checkSchemaAndReturnType(FitsSourceRelation.scala:226)
	at com.astrolabsoftware.sparkfits.FitsRelation.load(FitsSourceRelation.scala:288)
	at com.astrolabsoftware.sparkfits.FitsRelation.buildScan(FitsSourceRelation.scala:387)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:308)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3360)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2545)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2759)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

This is clearly because in this section of the code, NAXIS is not expected to be 0:

override def getSizeRowBytes(keyValues: Map[String, String]) : Int = {

Whereas the offending part of the header looks like

# HDU 1 in 1033/1/photoObj-001033-1-0042.fits:
XTENSION= 'IMAGE   '           /Image Extension created by MWRFITS v1.8         
BITPIX  =                   16 /                                                
NAXIS   =                    0 /                                                
PCOUNT  =                    0 /                                                
GCOUNT  =                    1 /                                                

As I am ingesting hundreds of thousands of files it is very tricky for me to manually find out which ones have the empty tables since it is like finding needles in a haystack (and spark-fits does not point to the offending file). The only way I can think to avoid this is to filter by filesize or check all the headers in advance to remove the offending files which is cumbersome.

As an example here are some example fits files that are 1) normal 2) empty. The latter are the type of files that I would like to be able to handle (and preferably warned about) without spark-fits crashing.

normal_and_empty_table_example.zip

photoObj-001033-1-0011.fits <- normal
photoObj-001033-1-0042.fits <- empty

Would it be possible to add a try/except code block in FitsHduImage.scala such that spark-fits is able to ignore empty tables and/or provide some sort of warning (with verbose=True in spark-fits) so the user is aware of such files being skipped or that such files are the ones causing the error? This sort of behaviour would be very useful.

Please let me know what you think.

Thanks as always, keep up the good work!

Jacob

Python API

The simplest (i.e. no extra Python implementation) is probably to define a new DataSource for FITS, such that the FITS InputFormat can be called from Python:

rdd = sc.newAPIHadoopRDD(fn, "com.sparkfits.FitsFileInputFormat", ...)

But that means the current structure of the package will change (probably minimal change for the user interface though).

For reference:

On the top of the TODO list!

subset of rows from multiple DoubleType columns are incorrectly loaded as very small/large values

Hi @JulienPeloton ,

This one is a bit of a mystery to me but it looks like spark-fits is unable to parse a subset of the values a single fits file with a header that looks like this header.bad.txt when used with default settings. The file itself is not so large and can be downloaded here.

It looks like spark-fits sets a subset of the dataframe elements to very small or very large value. To give a bit of additional context, I believe the original file was made from the concatenation of several files so I originally thought that there may be varying precision in a single column which could be causing spark-fits to parse incorrect values. Despite this spark-fits seems to correctly determine the schema.

Another thing to note is all of the offending columns that I happen to be interested in are doubles ('specz_ra', 'specz_dec', 'specz_redshift', 'specz_redshift_err'):

Screenshot 2019-06-04 at 21 00 43

These problems can easily be highlighted with a show:

df.select('specz_ra', 'specz_dec', 'specz_redshift', 'specz_redshift_err').show()

I can confirm that these values are correct.

Screenshot 2019-06-04 at 20 29 31

and summary, show:

df.select('specz_ra', 'specz_dec', 'specz_redshift', 'specz_redshift_err').summary().show()

as you can see there are some very large/small values which cause the summary statistics to be quite crazy

Screenshot 2019-06-04 at 20 30 03

In Topcat the fits file is parsed just fine with no extra large or extra small values:

Screenshot 2019-06-04 at 20 07 05

Screenshot 2019-06-04 at 20 07 19

Screenshot 2019-06-04 at 20 07 33

Screenshot 2019-06-04 at 20 07 44

Originally the fits file has 2.3 million rows, however, after loading with spark-fits then cleaning the bad (very large/small) i.e. (((ra >= 0.0) or (ra <= 360.0)), ((dec >= -90.0) or (dec <= 90.0)), ( 0 < z, z_err < 5), the dataframe contains only a small fraction of the original rows .

The only way I could resolve the problem was to re-downloaded the exact same file as a .csv to avoid the parsing issue in spark-fits. In Topcat I confirmed that the .csv and .fits file are indeed exactly the same data with no unusual values and that it can parse both with no problem but just wanted to let you know in case there was an opportunity to improve spark-fits.

Here is the .csv summary which shows the values that I would expect:

Screenshot 2019-06-04 at 21 27 51

Please let me know if you have any idea what could be causing the problem.

Cheers,
Jacob

Handling compressed data

See the doc at:

https://archive.stsci.edu/fits/fits_standard/node39.html#SECTION00941220000000000000
=> 5.4.1.2 Conforming Extensions

A compressed image is stored in an extension such as:
https://fits.gsfc.nasa.gov/registry/tilecompression/tilecompression2.3.pdf

XTENSION = BINTABLE
TFIELDS = 1
TTYPE1 = 'COMPRESSED_DATA'
EXTNAME = 'COMPRESSED_IMAGE'
ZIMAGE = T

Zfields:

Zxxx fields give the original header fields pour the uncompressed image

datalen (in bits) : |BITPIX| x GCOUNT x (PCOUNT + NAXIS x NAXIS1 x ... x NAXISm )

More understanding...

Reading a FITS ZIMAGE:

  • if "ZIMAGE" in kv and "NAXIS" in kv and "PCOUNT" in kv then this is a compressed image HDU
  • the compressed image data is indeed given with the formula above (datalen)
  • each HDU contains
    • the HEADER with ZIMAGE, NAXIS, PCOUNT
    • the original metadata of the uncompressed image HDU prefixed by Z (Zxxx)
    • immediately a set of [(datalen/BLOCKSIZE) + 1] blocks of compressed data

The algo to uncompress data can be found in the "fits_uncompress_table" function in the "imcompress.c" source file of the cfitsio package.

(to be cont'd)

CA

DataFrame: Last element is sometimes missing

I've noticed that sometimes, the last element of a Table HDU or the last row of an Image HDU is missing.
Here is a pyspark example of a 3D image (200 x 200 x 200):

# Using standard tools to open the file
from astropy.io import fits
import numpy as np

data = fits.open("velocity_field_20.fits")
for i in range(3):
    print(np.shape(data[i].data))
# (200, 200, 200)
# (200, 200, 200)
# (200, 200, 200)
# Using spark-fits
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

for i in range(3):
    df = spark.read.format("fits").option("hdu", i).load("velocity_field_20.fits")
    print(df.count())
# 39999 (i.e 200x200 - 1 --> Wrong!)
# 40000 (i.e 200x200 --> OK)
# 39999 (i.e 200x200 - 1 --> Wrong!)

Most likely there is an index mismatch in FitsRecordReader.scala.

Add FITS header check as a user option

The PR #55 fixed a bug with the header check (the connector was checking all FITS header before starting the job. Good idea until you have +10,000 files....). The fix is however temporary, as you can see below:

// Check that all the files have the same Schema
// in order to perform the union. Return the HDU type.
// NOTE: This operation is very long for hundreds of files!
// NOTE: Limit that to the first 10 files.
// NOTE: Need to be fixed!
val implemented = if (listOfFitsFiles.size < 10) {
  checkSchemaAndReturnType(listOfFitsFiles)
} else{
  checkSchemaAndReturnType(listOfFitsFiles.slice(0, 10))
}

The idea would be then to add an option to the DataFrameReader (e.g. checkHeader), false by default, that would trigger the header checker.

Low throughput

The current throughput is around 5-10 MB/sec to load and convert FITS data to DataFrame.
The decoding lib needs to be improved...

header challenge: header with multivalued columns raises "java.lang.ArithmeticException: / by zero" during df.select(any_column).show() or df.select(any_column).take()

Hi guys just wanted to give some more feedback about my favourite spark package!

I encounter an error reading a fits file with an "exotic header". I assume the issue is due to the columns which contain data arrays. I would expect spark-fits to load multivalued columns as vectors but I think it might be causing bigger problems as I cannot view any columns.

For example when I read the data:

path = 'photoObj-001000-1-0027.fits'
df = sqlc.read.format("fits").option("hdu", 1).load(path)

The following error is thrown when calling:

df.select('OBJID').show()

The header is shown here example.txt and the file itself is zipped here photoObj-001000-1-0027.fits.zip

Before the error the schema is inferred:

Screenshot 2019-05-06 at 19 07 09

Despite this the multivalued columns (e.g. code 5E with shape 5, such as 'MODELMAG') are teated as floats. I would expect them to be them to be treated as vectors. Is this possible?

Screenshot 2019-05-06 at 19 19 04

Screenshot 2019-05-06 at 19 18 40

The error itself occurs after selecting any column (even if it is a regular non-multivalue column) and then applying the .take(n) or .show(n) method:

com.github.astrolabsoftware#spark-fits_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3714d087-ba08-4b46-bb49-9693f86131bb;1.0
	confs: [default]
	found com.github.astrolabsoftware#spark-fits_2.11;0.7.3 in central
:: resolution report :: resolve 183ms :: artifacts dl 3ms
	:: modules in use:
	com.github.astrolabsoftware#spark-fits_2.11;0.7.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-3714d087-ba08-4b46-bb49-9693f86131bb
	confs: [default]
	0 artifacts copied, 1 already retrieved (0kB/5ms)
2019-05-06 19:07:42 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2019-05-06 19:08:28 WARN  Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
[Stage 0:>                                                          (0 + 1) / 1]2019-05-06 19:08:30 ERROR Executor:91 - Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.ArithmeticException: / by zero
	at com.astrolabsoftware.sparkfits.FitsRecordReader.nextKeyValue(FitsRecordReader.scala:318)
	at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
2019-05-06 19:08:30 WARN  TaskSetManager:66 - Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.ArithmeticException: / by zero
	at com.astrolabsoftware.sparkfits.FitsRecordReader.nextKeyValue(FitsRecordReader.scala:318)
	at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Please let me know if you require any additional information or have any questions,
Cheers,
Jacob

Nans are present even after filtering out nans when reading large fits files (~1GB)

Hi Julien,

I hope you are keeping well and healthy :)

I was wondering if you could help me with a spark-fits issue that I have been having when reading large fits files ~1GB (e.g. https://portal.nersc.gov/project/cosmo/data/legacysurvey/dr8/north/sweep/8.0/sweep-100p030-110p035.fits).

It seems that even when I remove all nans from the dataframe, there still appears to be nans present when a collect/show/toPandas is called. The fraction of nans is small (<0.1%) but it does cause some dataloss and means that I have to filter the data a second time after the collection from the jvm. This issue also causes summary / aggregation statistics etc. to be incorrect.

Troubleshooting

The issue is random and therefore different elements of the dataframe are impacted each time the same collect/show/toPandas call is made with the same file and the same dataframe. This also means the total number of nans changes each time. The data is read directly from the fits file(s) using spark-fits each time the call is made (rather than a cached dataframe) which is why I think the fits reading stage could be causing the problem.

Perhaps it is related to how spark-fits splits up the file. I have tried various record length settings (recordlength=1 * 1024, recordlength=10 * 1024, recordlength=100 * 1024) but I am not sure what is optimal or if it could alleviate this issue.

The columns that I filter on are mag_*, which I derive from FLUX_* and FLUX_IVAR_* columns in the example fits file using a simple pyspark SQL expression.

I filter out the following cases for each mag_* column.

'mag_* > 27.0'
'mag_* <= 0.0'
'isnan(mag_*)'
'isnull(mag_*)'

These expressions are evaluated successfully and appear in the logical plan. However, on collection, some nans which should not be there have somehow been genereated. The same code also works with smaller fits files.

I also checked to see if any columns were incorrectly casted as DecimalType() (as this could also cause nans due to being incorrectly casted as the numpy object dtype). But since I explicitly cast all FLUX_* and FLUX_IVAR_* columns as FloatType(), the resulting mag_* columns are also FloatType() so this is not the cause of the problem.

Please let me know if you have any idea what could be causing this problem or if you require any more detailed screenshots/code samples to help troubleshoot.

I thought it could be related to this spark-fits bug I raised last year #84

Thanks @jacobic, it helps a lot to understand.

Actually, I dig deep inside spark-fits. When the file is read on my computer, basically it makes chunks of 33,554,432 bytes. Guess what? 33,554,432 is very close to 162829 [row] * 206 [bytes/row]... So it seems there is a mismatch in the starting index when reading a new chunk of this file. Looking deeper, I found the bug... The bug was introduced by an ugly fix for Image HDU:

// Here is an attempt to fix a bug when reading images:
//
// I noticed that when an image is split across several HDFS blocks,
// the transition is not done correctly, and there is one line typically
// missing. After some manual inspection, I found that introducing a shift
// in the starting index helps removing the bug. The shift is function of
// the HDU index, and depends whether the primary HDU is empty or not.
// By far I'm not convinced about this fix in general, but it works for
// the few examples that I tried. If you face a similar problem,
// or find a general solution, let me know!
var shift = if (primaryfits.empty_hdu) {
FITSBLOCK_SIZE_BYTES * (conf.get("hdu").toInt - 3)
} else {
FITSBLOCK_SIZE_BYTES * (conf.get("hdu").toInt - 1)
}

I'm still not 100% sure this is the final word, but fixing it seems to give correct answer:

# +-------+--------------------+-------------------+---------+------------------+------------------+------------------+------------------+------------------+-------------------+-----------+
# |summary|           object_id|              d_pos|    d_mag|        specz_name|          specz_ra|         specz_dec|    specz_redshift|specz_redshift_err|specz_original_flag|specz_mag_i|
# +-------+--------------------+-------------------+---------+------------------+------------------+------------------+------------------+------------------+-------------------+-----------+
# |  count|             2299625|            2299625|  2299625|           2299625|           2299625|           2299625|           2299625|           2299625|            2299625|    2299625|
# |   mean|4.325592616529654...|0.20328246733677513|      NaN|              null|157.35647009225943|2.6757728602627218|0.6220279768352934|               NaN| 2.8340504323962143|        NaN|
# | stddev|8.044115412593749E15|0.19084386174782547|      NaN|              null|119.00443164434095|12.329133357718852|1.1161051065508547|               NaN| 23.447103620806015|        NaN|
# |    min|   36407037608854089|        5.951991E-4|-Infinity| 3DHST_AEGIS_10002|           6.86E-4|         -7.282074|          -9.99999|               0.0|                 -1|  -9999.166|
# |    max|   76557903720366950|          1.0048198|      NaN|ZCOSMOS-DR3-950074|         359.99936|         56.899819|            9.9999|               NaN|                B,1|        NaN|
# +-------+--------------------+-------------------+---------+------------------+------------------+------------------+------------------+------------------+-------------------+-----------+

I will make a fix tomorrow.

Originally posted by @JulienPeloton in #84 (comment)

Thanks as always,

Jacob

Long row size (> 1KB): java.lang.ArithmeticException: / by zero

As found in #69, if the row size is above 1 KB, and the option recordLength is not specified, the code fails with the error java.lang.ArithmeticException: / by zero.

For the background: the recordlength option controls how the data is split and read inside each HDFS block (or more precisely inside each InputSplit as they are not the same) by individual mappers for processing. 1 KB seemed to give good performance (and is large enough for most of the FITS I was using so far), and for larger value you might suffer from a longer garbage collector time.

However, the current error is not very explicit.

Zipped files

We need to understand whether we can handle zipped files (that is unpack blocks in HDFS!).
Is fpack doing this, or do we need to implement something new? Or do we need to unpack before putting files to HDFS?

spark-fits write mode

Hi Julien,

Long time no speak! :)

I still use this package every day and was just wondering if you considered creating a 'fits' writer interface? For instance the ability to group a pyspark dataframe by a number of columns and then to write to fits in directories which are partitioned by the values in the columns? At the moment I use a combination of pandas_udfs and astropy.table to do this but it is a bit messy and would be a lot cooler (and better performance) if it works out of the box with spark-fits

Please let me know what you think.

Thanks again,
Jacob

Read data from s3

I have been trying to read FITS files in a Ceph cluster from s3. It throws the following error:

Py4JJavaError: An error occurred while calling o63.load.
: java.io.EOFException: Attempted to seek or read past the end of the file 741911040
	at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:81)
	at org.apache.hadoop.fs.s3a.S3AInputStream.seek(S3AInputStream.java:115)
	at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:62)
	at com.astrolabsoftware.sparkfits.FitsLib$Fits.getNHDU(FitsLib.scala:404)
	at com.astrolabsoftware.sparkfits.FitsLib$Fits.<init>(FitsLib.scala:148)
	at com.astrolabsoftware.sparkfits.FitsRelation$$anonfun$schema$1.apply(FitsSourceRelation.scala:364)
	at com.astrolabsoftware.sparkfits.FitsRelation$$anonfun$schema$1.apply(FitsSourceRelation.scala:360)
	at scala.Option.getOrElse(Option.scala:121)
	at com.astrolabsoftware.sparkfits.FitsRelation.schema(FitsSourceRelation.scala:360)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:431)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

I'm not sure whether this is just a bug from the code (like index mismatch that was not caught by e.g. hadoop), or one needs to implement new things... First thing to look at: getNHDU.

Better handling of the partitions

This is linked to the issue #6, but somewhat different.
The way it is done now is that we have partition size roughly equal to HDFS block size (~128 MB). Ideally partition size should follow resource of the cluster (typically 2-3x the number of cores or executors in use).
I guess repartitioning at the very end would be very costly though... Need to investigate at a lower level then.

spark-fits produces duplicated rows when reading some fits files

Hi,

I've uncovered what it appears a serious bug that produces an extra row (duplicated) when loading fits files inside a specific size range.

I was trying to convert a directory full of fits files into parquet to then ingest them into Hive.
spark-fits is very useful here to circumvent pandas, as pandas does not support complex data types (list, map, struct..), so kudos :P

For instance, this file: (1.4G, sorry I could not find a smaller example)
https://portal.nersc.gov/project/cosmo/data/legacysurvey/dr8/south/sweep/8.0/sweep-160p025-170p030.fits

When loading using astropy.io.fits or astropy.tables.Table reports 3147687 rows, also as reported in the NAXIS2 header value.

When opening with spark-fits it reports 3147688 rows, one extra.

I've prepared a google colab notebook to reproduce the error:
https://colab.research.google.com/drive/1NRqxhOXwGiW7tTRJmyB8HpK1mEff32zH?usp=sharing

Software versions:
Python 2
spark 2.4.4
astropy 2.0.14
pandas 0.24.2
numpy 1.16.4
com.github.astrolabsoftware:spark-fits_2.11:0.8.3

On the multifile problem in spark-fits

The current implementation is not great for reading many files (100+).

Current implementation, and why this is not great.

The way we read and distribute the data from many files is by:

  1. list all the files to read,
  2. for each file create a RDD from the HDU data,
  3. make the union of RDD recursively
// Union if more than one file
for ((file, index) <- fns.slice(1, nFiles).zipWithIndex) {
  rdd = if (implemented) {
    rdd.union(loadOneHDU(file))
  } else {
    rdd.union(loadOneEmpty)
  }
}

While it is very simple and it works great for a small number of files, it completely explodes when the number of files gets big (100+). There are two problems: (1) the RDD lineage gets horribly long (to enforce fault tolerance, the DAG keeps track of every single RDD) and Spark overhead is going to absolutely dominate anything, and (2) for file size smaller than the partition size the union of RDD is creating one (quasi-empty) partition per file hence getting super sub-optimal. Here is the DAG for reading 3 files where you can see how the final RDD is created:

screen shot 2018-10-18 at 07 54 17

Moreover for large number of files (i.e. large number of rdd.union), you are not just optimal, you run into deeper problem

Exception in thread "dag-scheduler-event-loop" java.lang.StackOverflowError
       at java.io.ObjectStreamClass$WeakClassKey.<init>(ObjectStreamClass.java:2505)
       at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:348)
       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134)
       at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
       at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
       at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
       at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
       at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
       at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

Small fix, and why this is not great either.

Instead of performing multiple RDD unions, one can squeeze into one union of all the RDD in once (changing point c. above).
For that, one can use the method union of the sparkContext directly:

// Union if more than one file
val rdd = if (implemented) {
  sqlContext.sparkContext.union(
    fns.map(file => loadOneHDU(file))
  )
} else {
  sqlContext.sparkContext.union(
    fns.map(file => loadOneEmpty)
  )
}

The DAG gets updated nicely:

screen shot 2018-10-18 at 07 53 59

and I could go up to 1000 files. While I do not encounter the StackOverflowError anymore, for more than 1000 files I got the nasty:

2018-10-18 08:33:30 WARN  TransportChannelHandler:78 - Exception in connection from /134.158.75.162:47942
java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:357)
	at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1884)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1750)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2041)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:108)
	at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:271)

Of course! Looking at the DAG, it is obvious that this series of union cannot scale... We need to implement a different approach.

How this could be fixed

We could instead forget about this union strategy, and focus on what is used by other connectors (PartitionedFile).

What should really be done

Rewrite everything for complying with V2... ;-)

Slashes in TTYPEn are not handled

Hi,

While trying to read a bunch of FITS files, it fails complaining they have duplicated column names, as it cuts the column name to the value before the slash. An extract of the header:

TTYPE8  = 'lsst/u_MEAN'                                                         
TFORM8  = 'E       '                                                            
TTYPE9  = 'lsst/g_MEAN'                                                         
TFORM9  = 'E       '                                                            
TTYPE13 = 'euclid/VIS_MEAN'                                                     
TFORM13 = 'E       '                                                            
TTYPE14 = 'euclid/Y_MEAN'                                                       
TFORM14 = 'E       '                                                            

The binary table extension documentation, for the TTYPEn keyword states:

The value field for this indexed keyword shall contain a character string, giving the name of field n. It is recommended that only letters, digits, and underscore (hexadecimal code 5F, ``_'') be used in the name. String comparisons with the values of TTYPEn keywords should not be case sensitive. The use of identical names for different fields should be avoided.

So slashes are not recommended, but also neither excluded nor forbidden :)

Relationship between SparkFits dataframe and source fits file?

Just started looking at SparkFits for a large scale data modelling and analysis project for radio astronomy - we'll be dealing with very large image cubes - (currently testing on a 350gb Stokes 3D fits cube but will be testing further on 1TB - multi TB fits files in future).

Apologies if this is a fairly simple question, but the documentation isn't very clear - how does SparkFits instantiate the data frame from a 3D cube? ie we have an experimental 3D image cube (~350 TB), covering a sky coordinate area of ~ 10 x 10 degrees (Right Ascension and Declination) over ~2590 frequency channels. Pixel measurements are in Hz. The cube dimensions - Ra 5,607, dec 5,694 and Freq channels 2,592 so the cube shape (Ra, Dec, Freq) is (5607, 5692, 2592).

The data frame has 14,544,168 records, each image record is an array of 5607 elements, so if the data frame was expressed as a 2D matrix, it would have the shape of (14544168, 5607).

Each value in each image array is a pixel value in Hz for a specific Ra, Dec and Frequency channel, correct?

Given that the image column in each row is 5607 elements, is it correct to assume each row represents the pixel values for all Ra positions for one specific Dec and Frequency channel value?

Assuming the above is correct, an image for one specific frequency channel would be a 2D array (dec, Ra) of (5694, 5607) - within the SparkFits data frame, how would this be extracted? Rows 1-5694 for one specific frequency channel, subsequent groups of 5694 rows for subsequent frequency channels?

And is it possible to extract the actual values for the Ra, Dec and Frequency values from the data frame, or does this need to be pulled from the fits header?

Again, apologies if these are simple questions - and thanks for your time.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.