Giter Site home page Giter Site logo

square / blocks Goto Github PK

View Code? Open in Web Editor NEW
30.0 30.0 16.0 793 KB

Simple interface to read, organize, and manipulate structured data in files on local and cloud storage

Home Page: https://sq-blocks.readthedocs.io/en/latest/

License: Apache License 2.0

Python 100.00%

blocks's People

Contributors

baxen avatar capers avatar damienrj avatar dchristle avatar deekshachugh avatar dwyatte avatar stillmatic avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

blocks's Issues

Add parquet-specific dataset IO for reduced memory usage

Currently, blocks.assemble expects all datafile frames to be in memory simultaneously for merging

# Concatenate all rgroups

Unfortunately, there are some platform-dependent memory allocation issues that are likely out of blocks' control where Linux requires 2x the expected memory for blocks.assemble (see attached profiling).

One idea is to add a mode to blocks.assemble that would only hold two frames in memory at once (the partially assembled one and the next datafile frame to merge). It's unclear if this would work around the underlying memory allocation issue on Linux, but could be worth a shot and would generally improve memory usage.

Memory profile assembling 15 GB of data on Mac
blocks_mac

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
    30   86.297 MiB   86.297 MiB           1   @profile
    31                                         def assemble(
    32                                             path: str,
    33                                             cgroups: Optional[Sequence[cgroup]] = None,
    34                                             rgroups: Optional[Sequence[rgroup]] = None,
    35                                             read_args: Any = {},
    36                                             cgroup_args: Dict[cgroup, Any] = {},
    37                                             merge: str = "inner",
    38                                             filesystem: FileSystem = GCSFileSystem(),
    39                                         ) -> pd.DataFrame:
    74   86.438 MiB    0.141 MiB           1       grouped = _collect(path, cgroups, rgroups, filesystem)
    75                                         
    76                                             # ----------------------------------------
    77                                             # Concatenate all rgroups
    78                                             # ----------------------------------------
    79   86.438 MiB    0.000 MiB           1       frames = []
    80                                         
    81  731.676 MiB -3327.988 MiB           2       for group in grouped:
    82   86.438 MiB    0.000 MiB           1           datafiles = grouped[group]
    83   86.438 MiB    0.000 MiB           1           args = read_args.copy()
    84   86.438 MiB    0.000 MiB           1           if group in cgroup_args:
    85                                                     args.update(cgroup_args[group])
    86 4059.664 MiB -184418.691 MiB         403           frames.append(pd.concat(read_df(d, **args) for d in datafiles))
    87                                         
    88                                             # ----------------------------------------
    89                                             # Merge all cgroups
    90                                             # ----------------------------------------
    91  731.691 MiB    0.016 MiB           1       df = _merge_all(frames, merge=merge)
    92                                         
    93                                             # ----------------------------------------
    94                                             # Delete temporary files
    95                                             # ----------------------------------------
    96  731.746 MiB    0.016 MiB         201       for file in datafiles:
    97  731.746 MiB    0.039 MiB         200           if hasattr(file.handle, "name"):
    98                                                     tmp_file_path = file.handle.name
    99                                                     if os.path.exists(tmp_file_path):
   100                                                         os.remove(file.handle.name)
   101  731.754 MiB    0.008 MiB           1       return df

Memory profile assembling 15 GB of data on Linux
blocks_linux

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
    30  104.570 MiB  104.570 MiB           1   @profile
    31                                         def assemble(
    32                                             path: str,
    33                                             cgroups: Optional[Sequence[cgroup]] = None,
    34                                             rgroups: Optional[Sequence[rgroup]] = None,
    35                                             read_args: Any = {},
    36                                             cgroup_args: Dict[cgroup, Any] = {},
    37                                             merge: str = "inner",
    38                                             filesystem: FileSystem = GCSFileSystem(),
    39                                         ) -> pd.DataFrame:
    74  104.805 MiB    0.234 MiB           1       grouped = _collect(path, cgroups, rgroups, filesystem)
    75                                         
    76                                             # ----------------------------------------
    77                                             # Concatenate all rgroups
    78                                             # ----------------------------------------
    79  104.805 MiB    0.000 MiB           1       frames = []
    80                                         
    81 30356.867 MiB    0.000 MiB           2       for group in grouped:
    82  104.805 MiB    0.000 MiB           1           datafiles = grouped[group]
    83  104.805 MiB    0.000 MiB           1           args = read_args.copy()
    84  104.805 MiB    0.000 MiB           1           if group in cgroup_args:
    85                                                     args.update(cgroup_args[group])
    86 30356.867 MiB 30248.691 MiB         403           frames.append(pd.concat(read_df(d, **args) for d in datafiles))
    87                                         
    88                                             # ----------------------------------------
    89                                             # Merge all cgroups
    90                                             # ----------------------------------------
    91 30356.867 MiB    0.000 MiB           1       df = _merge_all(frames, merge=merge)
    92                                         
    93                                             # ----------------------------------------
    94                                             # Delete temporary files
    95                                             # ----------------------------------------
    96 30356.867 MiB    0.000 MiB         201       for file in datafiles:
    97 30356.867 MiB    0.000 MiB         200           if hasattr(file.handle, "name"):
    98                                                     tmp_file_path = file.handle.name
    99                                                     if os.path.exists(tmp_file_path):
   100                                                         os.remove(file.handle.name)
   101 30356.867 MiB    0.000 MiB           1       return df

Segmentation fault when using blocks

Have you experienced Segmentation fault error when using blocks? This is after gsutil downloads. The file type is parquet with snappy compression.

>>> import blocks
>>> df = blocks.assemble('/tmp/tmpu3ivvojp/')
Segmentation fault (core dumped)

GCSFileSystem Has Side Effects on MultiProcessing

Hello,

It seems that using GCSFileSystem prevents blocks.assemble function call from finishing in MultiProcessing. Please see the following example:

from concurrent.futures import ProcessPoolExecutor
import blocks
import pandas as pd
from typing import List

from blocks.filesystem import GCSFileSystem

def get_file_names(file_path: str) -> List[str]:
    full_names = GCSFileSystem().ls(file_path)[1:]  # exclude the folder name
    return full_names

def count_rows(file_name: str) -> int:
    df = blocks.assemble(file_name)
    return df.shape[0]
    


if __name__ == "__main__":
    # This code will never finish because we use GCSFileSystem first then call blocks.assemble later.
    MAX_WORKERS = 5
    input_path = "gs://path/to/parqeut/files"

    names = get_file_names(input_path)
    print(names)
    
    with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
        results = executor.map(count_rows, names)
        
    for cnt in results:
        print(cnt)

If we give file names directly without using GCSFileSystem, the code works:

if __name__ == "__main__":
    # If we don't call GCSFileSystem, the code works.
    MAX_WORKERS = 5

    names = ['file1.pq', 'file2.pq', 'file3.pq']
    print(names)
    
    with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
        results = executor.map(count_rows, names)
        
    for cnt in results:
        print(cnt)

Any thoughts? Thank you!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.