Comments (13)
Hi @GeoffDuniam
Thanks for the detailed questions! Before starting, I must say that the treatment of image within spark-fits is still experimental compared to the more established case of Tables (but this is a good occasion for me to improve it!).
Each value in each image array is a pixel value in Hz for a specific Ra, Dec and Frequency channel, correct?
Correct
Given that the image column in each row is 5607 elements, is it correct to assume each row represents the pixel values for all Ra positions for one specific Dec and Frequency channel value?
Correct
Assuming the above is correct, an image for one specific frequency channel would be a 2D array (dec, Ra) of (5694, 5607) - within the SparkFits data frame, how would this be extracted? Rows 1-5694 for one specific frequency channel, subsequent groups of 5694 rows for subsequent frequency channels?
Yes correct, the 5694 first dataframe rows would correspond to the first image (i.e. first frequency channel), etc.
But in order to manipulate images (instead of rows of images), you would have to reshape the Spark dataframe. This is a huge bottleneck in spark-fits as there are currently no easy solutions... The most naive approach would be to assign an index to each dataframe row, and grouping by this index assuming each image of the cube has the same size. This is something I could add in spark-fits (returning the image index for each row as an option). But the grouping operation would add (potentially large) overhead in execution time (involves data shuffling...). Another approach would be to force spark-fits to return one image per dataframe row - but it surely would take some time to implement.
And is it possible to extract the actual values for the Ra, Dec and Frequency values from the data frame, or does this need to be pulled from the fits header?
Currently no it needs to be pulled from the fits header - spark-fits does use some part of the header data internally, but it is not exposed to the user. I usually use external library like astropy whenever I want to further explore the header. As a future improvement for spark-fits I could imaging we could return a dataframe with header values.
Julien
from spark-fits.
To be more specific, here is how spark-fits would currently load the data of a 3D cube:
from spark-fits.
Hi Julien,
Thanks for the reply, it clarifies a lot for us. We'll potentially be looking at large numbers of FITs image files over 1 TB and so we're having to investigate how best to get the data to the end-users. We're interested in SparkFits in that it gives a data frame we can save as a Parquet table on HDFS which gives us the ability to start processing with Spark on YARN, without having to extract the data from a FITs or Spectral cube (which would have a large overhead doing the transformations for a multi TB file). If we can configure the data with a Hive schema we can then use Spark SQL to pull subsets of the data into a Spark enabled Jupyter notebook for analysis.
As you say, reshaping the SparkFits data frame into an easier format for end-users to manipulate is something of a challenge; we're investigating possibilities but at the moment no clear solution is presenting itself.
We'll keep you posted on progress. Also, please do let us know if we can test any feature for you.
Thanks again,
Geoff
from spark-fits.
Hi Julien,
Just have a quick question - we've experimented with creating a partitioned Parquet table on Hive/HDFS to persist the data frame from SparkFits. The Parquet table is physically partitioned by channel and source filename as we'll be storing multiple FITS files for analysis, and the smallest of these is going to be around 800 + GB. The channel value is added to the data frame, as is an index value for the rows. A fairly expensive operation, but it's only done once on load and then the data is available via Spark sqlContext() call to create a data frame from the persisted data. Once the table is created, the extraction of subcube data is straightforward. All good so far.
However, if we extract a full channel (one image) it looks very much like we have an offset in the data. This is what the original image for one channel looks like
If we pull the image data from the Parquet table, reshape the data to a 2d image we get this
I'm not sure how or why this is happening, any ideas?
Addressing the positional data question, we're also pulling positional and spectra data from the FITS header and storing it as an additional dimension table - again, an expensive operation but it's only done once.
Thanks again, Julien, appreciate your time.
Regards
Geoff
from spark-fits.
Hi @GeoffDuniam,
Thanks for reporting this issue!
Which version of spark-fits did you use? Do you see the same offset using the 0.8.3 or the 0.8.4 (latest)?
Would it be possible to reproduce the problem on a small subset of data and sharing it with me? It does not need to contain any physical information, and playing with the same data structure as you are using would help me to understand where the problem is coming from.
Thanks!
from spark-fits.
Hi Julien,
No problem, I can take one channel of the data from one of the FITS files we're using - it'll be a 2d image, ~ 5500 x 5500 pixels.
However, it just occurs to me (and confirmed by checking the Jupyter kernel we're using as well as the spark2-submit routines we're using as well), we're still on 0.7.2. That being the case before I get you to spend time checking this for us, it'd probably be worthwhile upgrading to 0.8.4 and reloading the data. Yes?
Thanks again, Julien, appreciate your time. Let me know if you want the data or you'd rather wait until we upload the data again using 0.8.4.
Cheers
Geoff
from spark-fits.
Hi @GeoffDuniam
Thanks for your feedback. I would suggest to try with 0.8.4 first as there were important fixes made with respect to 0.7.2. Then if the problem persists, I would be happy to check on a small dataset. Note I will be offline from July 5 until the July 15.
Best,
Julien
from spark-fits.
Hi Julien,
Upgraded our package to 0.8.4, and we're still seeing the same result.
Probably good to give you some background - the FITS source we're using is one of two Stokes image cubes, with respective sizes of 307 and 835 GB. The cubes are nominally polarised 4-dimensional cubes (Ra, Dec, Polarisation, Frequency) but the cubes have only one level of polarisation, so they're actually 3-dimensional cubes. We're creating the RDD from the data frame with an index (return df.rdd.zipWithIndex() ) and we can't use .glom() on files of this size because we run into memory limitations on the cluster (3 masters, ten workers, one edge node supporting Jupyterhub). The index is there to calculate the specific frequency bands so we can physically partition the data on a Parquet table, and the index also allows us to reference the declination values which we pull out of the HDU.
I've done an image extraction from both the data frame and the data stored on the Parquet table and the image still has that offset. I'll generate a subset of the smaller FITS cube and get it to you.
Thanks again for the help, much appreciated.
Cheers
Geoff
from spark-fits.
Hi Julien,
I've got two FITS files that demonstrate the issue, one with two channels and one with five channels. Interestingly, the smaller two-channel FITS file displays the images correctly, whereas the five-channel file shows the offset. I'm not going to be able to attach them here, the smaller file is ~ 120 MB and we have a 10 MB limit here. I could put them up on dropbox and share them from there? Let me know.
Thanks again and regards,
Geoff
from spark-fits.
Hi Julien,
if it helps, I'll give you the links to download the original FITS files from Pawsey as well - the residual SB8170 cube is the big one, 835 GB. Both FITS files are public access.
https://data.pawsey.org.au/download/SKAvis/residual.i.SB8170.cube.fits
https://data.pawsey.org.au/download/SKAvis/residual.i.SB8170.cube.fits
HTH. I'm getting the sub cubes up now and I'll let you know when they're available.
Enjoy your break!
Cheers
Geoff
from spark-fits.
Hi @GeoffDuniam,
Thanks for the detailed explanations! Sharing the sub cubes (two-channel and five-channel files) via dropbox is fine by me. Let me know.
Best,
Julien
from spark-fits.
Hi Julien,
No problem, I have them up on our Cloud Share here - drop me a mail with your address and I'll get the download link to you.
geoff dot duniam (at) gmail
Cheers
Geoff
from spark-fits.
Thanks @GeoffDuniam, thanks to the data you sent I think I understood the problem. Long story short: when writing data on disk, spark shuffle the data block (which are different from original images) - and when loading the data, the original data order is lost. The solution to this problem would be to attach the image index to each row when loading data with spark-fits. I will make this feature available in the next spark-fits release (hopefully by the end of the week).
Below is a detailed summary of the problem and its solution (using the data you sent):
from astropy.io import fits
import matplotlib.pyplot as plt
import numpy as np
Loading data with spark-fits
Let's load images stored in FITS using the spark-fits connector:
filename = '/path/to/Julien5.fits'
nfrequency = 5
df = spark.read.format('fits').option('hdu', 0).load(filename)
This file contains several images split in rows:
nrow = df.count()
msg = """
Total number of rows: {}
Number of frequencies: {}
Number of Dec measurements per frequency: {}
Number of RA measurements per frequency: {}
""".format(nrow, nfrequency, int(nrow/nfrequency), len(df.take(1)[0][0]))
print(msg)
Total number of rows: 28270
Number of frequencies: 5
Number of Dec measurements per frequency: 5654
Number of RA measurements per frequency: 5607
# Take the first channel
channel = df.select('Image').take(int(nrow/nfrequency))
# Converting Row into list
channel = [i[0] for i in channel]
# plot the image
plt.imshow(channel)
Comparison with standard fits tools
Let's load the same image data with standard FITS tools from astropy
datafits = fits.open(filename)
print('(Frequency, 1, Dec, RA)')
print(datafits[0].data.shape)
(Frequency, 1, Dec, RA)
(5, 1, 5654, 5607)
plt.imshow(datafits[0].data[0][0])
maxdiff = np.max(np.abs(np.nan_to_num(channel) - np.nan_to_num(datafits[0].data[0][0])))
print('Difference between spark-fits and standard tools: {}'.format(maxdiff))
Difference between spark-fits and standard tools: 0.0
Good - there is no difference between the data read by spark-fits and astropy!
Saving in Parquet and reloading data
Let's now save the data read by spark-fits on disk in the parquet format (actually, we could save in any format available to Spark)
df.write.parquet(filename.replace('.fits', '.parquet'), compression='none')
Let's reload this data as-is, and plot the first image as we did previously
df_parquet = spark.read.format('parquet').load(filename.replace('.fits', '.parquet'))
# Take the first channel
channel2 = df_parquet.select('Image').take(int(nrow/nfrequency))
# Converting Row into list
channel2 = [i[0] for i in channel2]
# plot the image
plt.imshow(channel2)
Oooooops! This time, the image is different, as if it was shifted or if the data was shuffled. What happened? To understand what happened, one needs to understand how data is distributed at each stage.
Original FITS data
The original FITS data is contained in the primary HDU. Each image is a 3D array (freq, dec, ra), and the data block is just the concatenation of all images. There is no ambiguity between different images.
spark-fits partitioning logic
When spark-fits reads data from disk, it pulls data by blocks (aka partitions). Each block size is determined by (1) the underlined storage system if the data is already distributed and/or (2) Spark internal constraints to optimize the data I/O operations. But each block does not have to follow the logic of the data itself: each block has a sequence of rows that is part of an image. Hence, there is no guarantee that the blocks created by spark-fits follow the original images. However, if the data is contained in a single file, spark-fits still follows the order of the data (i.e. it first reads the first row of the first image, the second row of the first image, etc.), hence the user has still the illusion to manipulate data that is ordered as the original FITS file.
Saving data on disk, and reloading
When saving the data, each spark mapper will write its data blocks on disk. Mappers are not ordered - so the write operation is a competition between them! Hence the mapper which has data for the first image is not guaranteed to write first (that's the beauty of distributed computation). Spark will write on disk all blocks previously defined by spark-fits, but in a complete disorder! That's very much like a puzzle. When the user re-read the data from disk, the blocks are still the same, but their order is completely randomized.
Which solution then?
As is, once the data is loaded there is no way to re-assemble the data as the order is lost during blocks creation. This is something I did not envisage as I was not really using images with spark-fits. The simplest solution would be to add an image index to each row upon DataFrame creation with spark-fits such that the user can repartition the data accordingly.
Fixing the partitioning (new in spark-fits 0.9.0)
We save (unordered) data on disk, and reload it using Spark. The data is then repartitioned to put one image per partition. This feature is only available in spark-fits 0.9+.
# repartition according to the image index
df_repartitioned = df.repartition('ImgIndex')
# Write on disk
df_repartitioned.write.parquet(filename.replace('.fits', '2.parquet'), compression='none')
# Reload
df_parquet = spark.read.format('parquet').load(filename.replace('.fits', '2.parquet'))
# Take the first channel
channel2 = df_parquet.take(int(nrow/nfrequency))
# Converting Row into list
channel2 = [i[0] for i in channel2]
# plot the image
plt.imshow(channel2)
maxdiff = np.max(np.abs(np.nan_to_num(channel) - np.nan_to_num(channel2)))
print('Difference between fits and parquet: {}'.format(maxdiff))
Difference between fits and parquet: 0.0
Note that with this repartitioning, you can even manipulate each image separately using mapPartition
:
# 1 partition = 1 image
df.rdd.mapPartition(image_processing_function)
from spark-fits.
Related Issues (20)
- Slashes in TTYPEn are not handled HOT 5
- Switch the CI to GitHub actions
- header challenge: cannot infer size of type B from the header HOT 5
- FITS HDU image: no bijection between TFORMn value (L, A, B, J, K, ..) and `BITPIX`
- DataFrame reader cannot load files as list of paths or comma separated string of paths HOT 5
- Read data from s3 HOT 2
- Benchmarking on s3
- header challenge: header with multivalued columns raises "java.lang.ArithmeticException: / by zero" during df.select(any_column).show() or df.select(any_column).take() HOT 5
- Long row size (> 1KB): java.lang.ArithmeticException: / by zero HOT 1
- Add support for Scala 2.12 HOT 1
- .option('columns', 'col1,col2,col3,col4') does not preserve order HOT 3
- Bad mapping between scalars (SPARK) and 1element vector (FITS) HOT 1
- fits files that match glob expressions which might contain empty HDU tables (NAXIS=0) causes fatal error HOT 4
- subset of rows from multiple DoubleType columns are incorrectly loaded as very small/large values HOT 12
- some columns are missing in dataframe when loaded with spark-fits (although present when loaded with topcat/astropy/fitsio) HOT 1
- CI: Install of Oracle JDK8 is failing HOT 1
- spark-fits write mode HOT 2
- spark-fits produces duplicated rows when reading some fits files HOT 7
- Nans are present even after filtering out nans when reading large fits files (~1GB) HOT 4
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from spark-fits.