Giter Site home page Giter Site logo

Comments (13)

JulienPeloton avatar JulienPeloton commented on September 23, 2024

Hi @GeoffDuniam

Thanks for the detailed questions! Before starting, I must say that the treatment of image within spark-fits is still experimental compared to the more established case of Tables (but this is a good occasion for me to improve it!).

Each value in each image array is a pixel value in Hz for a specific Ra, Dec and Frequency channel, correct?

Correct

Given that the image column in each row is 5607 elements, is it correct to assume each row represents the pixel values for all Ra positions for one specific Dec and Frequency channel value?

Correct

Assuming the above is correct, an image for one specific frequency channel would be a 2D array (dec, Ra) of (5694, 5607) - within the SparkFits data frame, how would this be extracted? Rows 1-5694 for one specific frequency channel, subsequent groups of 5694 rows for subsequent frequency channels?

Yes correct, the 5694 first dataframe rows would correspond to the first image (i.e. first frequency channel), etc.

But in order to manipulate images (instead of rows of images), you would have to reshape the Spark dataframe. This is a huge bottleneck in spark-fits as there are currently no easy solutions... The most naive approach would be to assign an index to each dataframe row, and grouping by this index assuming each image of the cube has the same size. This is something I could add in spark-fits (returning the image index for each row as an option). But the grouping operation would add (potentially large) overhead in execution time (involves data shuffling...). Another approach would be to force spark-fits to return one image per dataframe row - but it surely would take some time to implement.

And is it possible to extract the actual values for the Ra, Dec and Frequency values from the data frame, or does this need to be pulled from the fits header?

Currently no it needs to be pulled from the fits header - spark-fits does use some part of the header data internally, but it is not exposed to the user. I usually use external library like astropy whenever I want to further explore the header. As a future improvement for spark-fits I could imaging we could return a dataframe with header values.

Julien

from spark-fits.

JulienPeloton avatar JulienPeloton commented on September 23, 2024

To be more specific, here is how spark-fits would currently load the data of a 3D cube:

df

from spark-fits.

GeoffDuniam avatar GeoffDuniam commented on September 23, 2024

@JulienPeloton

Hi Julien,

Thanks for the reply, it clarifies a lot for us. We'll potentially be looking at large numbers of FITs image files over 1 TB and so we're having to investigate how best to get the data to the end-users. We're interested in SparkFits in that it gives a data frame we can save as a Parquet table on HDFS which gives us the ability to start processing with Spark on YARN, without having to extract the data from a FITs or Spectral cube (which would have a large overhead doing the transformations for a multi TB file). If we can configure the data with a Hive schema we can then use Spark SQL to pull subsets of the data into a Spark enabled Jupyter notebook for analysis.

As you say, reshaping the SparkFits data frame into an easier format for end-users to manipulate is something of a challenge; we're investigating possibilities but at the moment no clear solution is presenting itself.

We'll keep you posted on progress. Also, please do let us know if we can test any feature for you.

Thanks again,

Geoff

from spark-fits.

GeoffDuniam avatar GeoffDuniam commented on September 23, 2024

@JulienPeloton

Hi Julien,

Just have a quick question - we've experimented with creating a partitioned Parquet table on Hive/HDFS to persist the data frame from SparkFits. The Parquet table is physically partitioned by channel and source filename as we'll be storing multiple FITS files for analysis, and the smallest of these is going to be around 800 + GB. The channel value is added to the data frame, as is an index value for the rows. A fairly expensive operation, but it's only done once on load and then the data is available via Spark sqlContext() call to create a data frame from the persisted data. Once the table is created, the extraction of subcube data is straightforward. All good so far.

However, if we extract a full channel (one image) it looks very much like we have an offset in the data. This is what the original image for one channel looks like
ResidualFomFile

If we pull the image data from the Parquet table, reshape the data to a 2d image we get this

ResidualFromParquet

I'm not sure how or why this is happening, any ideas?

Addressing the positional data question, we're also pulling positional and spectra data from the FITS header and storing it as an additional dimension table - again, an expensive operation but it's only done once.

Thanks again, Julien, appreciate your time.

Regards

Geoff

from spark-fits.

JulienPeloton avatar JulienPeloton commented on September 23, 2024

Hi @GeoffDuniam,

Thanks for reporting this issue!

Which version of spark-fits did you use? Do you see the same offset using the 0.8.3 or the 0.8.4 (latest)?

Would it be possible to reproduce the problem on a small subset of data and sharing it with me? It does not need to contain any physical information, and playing with the same data structure as you are using would help me to understand where the problem is coming from.

Thanks!

from spark-fits.

GeoffDuniam avatar GeoffDuniam commented on September 23, 2024

@JulienPeloton

Hi Julien,

No problem, I can take one channel of the data from one of the FITS files we're using - it'll be a 2d image, ~ 5500 x 5500 pixels.

However, it just occurs to me (and confirmed by checking the Jupyter kernel we're using as well as the spark2-submit routines we're using as well), we're still on 0.7.2. That being the case before I get you to spend time checking this for us, it'd probably be worthwhile upgrading to 0.8.4 and reloading the data. Yes?

Thanks again, Julien, appreciate your time. Let me know if you want the data or you'd rather wait until we upload the data again using 0.8.4.

Cheers

Geoff

from spark-fits.

JulienPeloton avatar JulienPeloton commented on September 23, 2024

Hi @GeoffDuniam

Thanks for your feedback. I would suggest to try with 0.8.4 first as there were important fixes made with respect to 0.7.2. Then if the problem persists, I would be happy to check on a small dataset. Note I will be offline from July 5 until the July 15.

Best,
Julien

from spark-fits.

GeoffDuniam avatar GeoffDuniam commented on September 23, 2024

@JulienPeloton

Hi Julien,

Upgraded our package to 0.8.4, and we're still seeing the same result.

Probably good to give you some background - the FITS source we're using is one of two Stokes image cubes, with respective sizes of 307 and 835 GB. The cubes are nominally polarised 4-dimensional cubes (Ra, Dec, Polarisation, Frequency) but the cubes have only one level of polarisation, so they're actually 3-dimensional cubes. We're creating the RDD from the data frame with an index (return df.rdd.zipWithIndex() ) and we can't use .glom() on files of this size because we run into memory limitations on the cluster (3 masters, ten workers, one edge node supporting Jupyterhub). The index is there to calculate the specific frequency bands so we can physically partition the data on a Parquet table, and the index also allows us to reference the declination values which we pull out of the HDU.

I've done an image extraction from both the data frame and the data stored on the Parquet table and the image still has that offset. I'll generate a subset of the smaller FITS cube and get it to you.

Thanks again for the help, much appreciated.

Cheers

Geoff

from spark-fits.

GeoffDuniam avatar GeoffDuniam commented on September 23, 2024

@JulienPeloton

Hi Julien,

I've got two FITS files that demonstrate the issue, one with two channels and one with five channels. Interestingly, the smaller two-channel FITS file displays the images correctly, whereas the five-channel file shows the offset. I'm not going to be able to attach them here, the smaller file is ~ 120 MB and we have a 10 MB limit here. I could put them up on dropbox and share them from there? Let me know.

Thanks again and regards,

Geoff

from spark-fits.

GeoffDuniam avatar GeoffDuniam commented on September 23, 2024

@JulienPeloton

Hi Julien,

if it helps, I'll give you the links to download the original FITS files from Pawsey as well - the residual SB8170 cube is the big one, 835 GB. Both FITS files are public access.

https://data.pawsey.org.au/download/SKAvis/residual.i.SB8170.cube.fits

https://data.pawsey.org.au/download/SKAvis/residual.i.SB8170.cube.fits

HTH. I'm getting the sub cubes up now and I'll let you know when they're available.

Enjoy your break!

Cheers

Geoff

from spark-fits.

JulienPeloton avatar JulienPeloton commented on September 23, 2024

Hi @GeoffDuniam,

Thanks for the detailed explanations! Sharing the sub cubes (two-channel and five-channel files) via dropbox is fine by me. Let me know.

Best,
Julien

from spark-fits.

GeoffDuniam avatar GeoffDuniam commented on September 23, 2024

@JulienPeloton

Hi Julien,

No problem, I have them up on our Cloud Share here - drop me a mail with your address and I'll get the download link to you.

geoff dot duniam (at) gmail

Cheers

Geoff

from spark-fits.

JulienPeloton avatar JulienPeloton commented on September 23, 2024

Thanks @GeoffDuniam, thanks to the data you sent I think I understood the problem. Long story short: when writing data on disk, spark shuffle the data block (which are different from original images) - and when loading the data, the original data order is lost. The solution to this problem would be to attach the image index to each row when loading data with spark-fits. I will make this feature available in the next spark-fits release (hopefully by the end of the week).

Below is a detailed summary of the problem and its solution (using the data you sent):

from astropy.io import fits
import matplotlib.pyplot as plt
import numpy as np

Loading data with spark-fits

Let's load images stored in FITS using the spark-fits connector:

filename = '/path/to/Julien5.fits'
nfrequency = 5

df = spark.read.format('fits').option('hdu', 0).load(filename)

This file contains several images split in rows:

nrow = df.count()

msg = """
Total number of rows: {}
Number of frequencies: {}
Number of Dec measurements per frequency: {}
Number of RA measurements per frequency: {}
""".format(nrow, nfrequency, int(nrow/nfrequency), len(df.take(1)[0][0]))

print(msg)
Total number of rows: 28270
Number of frequencies: 5
Number of Dec measurements per frequency: 5654
Number of RA measurements per frequency: 5607
# Take the first channel
channel = df.select('Image').take(int(nrow/nfrequency))

# Converting Row into list
channel = [i[0] for i in channel]

# plot the image
plt.imshow(channel)

output_6_1

Comparison with standard fits tools

Let's load the same image data with standard FITS tools from astropy

datafits = fits.open(filename)
print('(Frequency, 1, Dec, RA)')
print(datafits[0].data.shape)
(Frequency, 1, Dec, RA)
(5, 1, 5654, 5607)
plt.imshow(datafits[0].data[0][0])

output_9_1

maxdiff = np.max(np.abs(np.nan_to_num(channel) - np.nan_to_num(datafits[0].data[0][0])))
print('Difference between spark-fits and standard tools: {}'.format(maxdiff))
Difference between spark-fits and standard tools: 0.0

Good - there is no difference between the data read by spark-fits and astropy!

Saving in Parquet and reloading data

Let's now save the data read by spark-fits on disk in the parquet format (actually, we could save in any format available to Spark)

df.write.parquet(filename.replace('.fits', '.parquet'), compression='none')

Let's reload this data as-is, and plot the first image as we did previously

df_parquet = spark.read.format('parquet').load(filename.replace('.fits', '.parquet'))
# Take the first channel
channel2 = df_parquet.select('Image').take(int(nrow/nfrequency))

# Converting Row into list
channel2 = [i[0] for i in channel2]

# plot the image
plt.imshow(channel2)

output_16_1

Oooooops! This time, the image is different, as if it was shifted or if the data was shuffled. What happened? To understand what happened, one needs to understand how data is distributed at each stage.

Original FITS data

The original FITS data is contained in the primary HDU. Each image is a 3D array (freq, dec, ra), and the data block is just the concatenation of all images. There is no ambiguity between different images.

spark-fits partitioning logic

When spark-fits reads data from disk, it pulls data by blocks (aka partitions). Each block size is determined by (1) the underlined storage system if the data is already distributed and/or (2) Spark internal constraints to optimize the data I/O operations. But each block does not have to follow the logic of the data itself: each block has a sequence of rows that is part of an image. Hence, there is no guarantee that the blocks created by spark-fits follow the original images. However, if the data is contained in a single file, spark-fits still follows the order of the data (i.e. it first reads the first row of the first image, the second row of the first image, etc.), hence the user has still the illusion to manipulate data that is ordered as the original FITS file.

Saving data on disk, and reloading

When saving the data, each spark mapper will write its data blocks on disk. Mappers are not ordered - so the write operation is a competition between them! Hence the mapper which has data for the first image is not guaranteed to write first (that's the beauty of distributed computation). Spark will write on disk all blocks previously defined by spark-fits, but in a complete disorder! That's very much like a puzzle. When the user re-read the data from disk, the blocks are still the same, but their order is completely randomized.

Which solution then?

As is, once the data is loaded there is no way to re-assemble the data as the order is lost during blocks creation. This is something I did not envisage as I was not really using images with spark-fits. The simplest solution would be to add an image index to each row upon DataFrame creation with spark-fits such that the user can repartition the data accordingly.

Fixing the partitioning (new in spark-fits 0.9.0)

We save (unordered) data on disk, and reload it using Spark. The data is then repartitioned to put one image per partition. This feature is only available in spark-fits 0.9+.

# repartition according to the image index
df_repartitioned = df.repartition('ImgIndex')

# Write on disk
df_repartitioned.write.parquet(filename.replace('.fits', '2.parquet'), compression='none')

# Reload
df_parquet = spark.read.format('parquet').load(filename.replace('.fits', '2.parquet'))

# Take the first channel
channel2 = df_parquet.take(int(nrow/nfrequency))

# Converting Row into list
channel2 = [i[0] for i in channel2]

# plot the image
plt.imshow(channel2)

output_19_1

maxdiff = np.max(np.abs(np.nan_to_num(channel) - np.nan_to_num(channel2)))
print('Difference between fits and parquet: {}'.format(maxdiff))
Difference between fits and parquet: 0.0

Note that with this repartitioning, you can even manipulate each image separately using mapPartition:

# 1 partition = 1 image
df.rdd.mapPartition(image_processing_function)

from spark-fits.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.