Giter Site home page Giter Site logo

uber / petastorm Goto Github PK

View Code? Open in Web Editor NEW
1.8K 40.0 281.0 2.75 MB

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.

License: Apache License 2.0

Python 99.53% Shell 0.01% Dockerfile 0.33% Makefile 0.13%
tensorflow pytorch deep-learning machine-learning sysml pyspark pyarrow parquet parquet-files

petastorm's Introduction

Petastorm

Build Status

Code coverage

License

Latest Version

Petastorm is an open source data access library developed at Uber ATG. This library enables single machine or distributed training and evaluation of deep learning models directly from datasets in Apache Parquet format. Petastorm supports popular Python-based machine learning (ML) frameworks such as Tensorflow, PyTorch, and PySpark. It can also be used from pure Python code.

Documentation web site: https://petastorm.readthedocs.io

Installation

pip install petastorm

There are several extra dependencies that are defined by the petastorm package that are not installed automatically. The extras are: tf, tf_gpu, torch, opencv, docs, test.

For example to trigger installation of GPU version of tensorflow and opencv, use the following pip command:

pip install petastorm[opencv,tf_gpu]

Generating a dataset

A dataset created using Petastorm is stored in Apache Parquet format. On top of a Parquet schema, petastorm also stores higher-level schema information that makes multidimensional arrays into a native part of a petastorm dataset.

Petastorm supports extensible data codecs. These enable a user to use one of the standard data compressions (jpeg, png) or implement her own.

Generating a dataset is done using PySpark. PySpark natively supports Parquet format, making it easy to run on a single machine or on a Spark compute cluster. Here is a minimalistic example writing out a table with some random data.

import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType

from petastorm.codecs import ScalarCodec, CompressedImageCodec, NdarrayCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField

# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema('HelloWorldSchema', [
    UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),
    UnischemaField('image1', np.uint8, (128, 256, 3), CompressedImageCodec('png'), False),
    UnischemaField('array_4d', np.uint8, (None, 128, 30, None), NdarrayCodec(), False),
])


def row_generator(x):
    """Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
    return {'id': x,
            'image1': np.random.randint(0, 255, dtype=np.uint8, size=(128, 256, 3)),
            'array_4d': np.random.randint(0, 255, dtype=np.uint8, size=(4, 128, 30, 3))}


def generate_petastorm_dataset(output_url='file:///tmp/hello_world_dataset'):
    rowgroup_size_mb = 256

    spark = SparkSession.builder.config('spark.driver.memory', '2g').master('local[2]').getOrCreate()
    sc = spark.sparkContext

    # Wrap dataset materialization portion. Will take care of setting up spark environment variables as
    # well as save petastorm specific metadata
    rows_count = 10
    with materialize_dataset(spark, output_url, HelloWorldSchema, rowgroup_size_mb):

        rows_rdd = sc.parallelize(range(rows_count))\
            .map(row_generator)\
            .map(lambda x: dict_to_spark_row(HelloWorldSchema, x))

        spark.createDataFrame(rows_rdd, HelloWorldSchema.as_spark_schema()) \
            .coalesce(10) \
            .write \
            .mode('overwrite') \
            .parquet(output_url)
  • HelloWorldSchema is an instance of a Unischema object. Unischema is capable of rendering types of its fields into different framework specific formats, such as: Spark StructType, Tensorflow tf.DType and numpy numpy.dtype.
  • To define a dataset field, you need to specify a type, shape, a codec instance and whether the field is nullable for each field of the Unischema.
  • We use PySpark for writing output Parquet files. In this example, we launch PySpark on a local box (.master('local[2]')). Of course for a larger scale dataset generation we would need a real compute cluster.
  • We wrap spark dataset generation code with the materialize_dataset context manager. The context manager is responsible for configuring row group size at the beginning and write out petastorm specific metadata at the end.
  • The row generating code is expected to return a Python dictionary indexed by a field name. We use row_generator function for that.
  • dict_to_spark_row converts the dictionary into a pyspark.Row object while ensuring schema HelloWorldSchema compliance (shape, type and is-nullable condition are tested).
  • Once we have a pyspark.DataFrame we write it out to a parquet storage. The parquet schema is automatically derived from HelloWorldSchema.

Plain Python API

The petastorm.reader.Reader class is the main entry point for user code that accesses the data from an ML framework such as Tensorflow or Pytorch. The reader has multiple features such as:

  • Selective column readout
  • Multiple parallelism strategies: thread, process, single-threaded (for debug)
  • N-grams readout support
  • Row filtering (row predicates)
  • Shuffling
  • Partitioning for multi-GPU training
  • Local caching

Reading a dataset is simple using the petastorm.reader.Reader class which can be created using the petastorm.make_reader factory method:

from petastorm import make_reader

 with make_reader('hdfs://myhadoop/some_dataset') as reader:
    for row in reader:
        print(row)

hdfs://... and file://... are supported URL protocols.

Once a Reader is instantiated, you can use it as an iterator.

Tensorflow API

To hookup the reader into a tensorflow graph, you can use the tf_tensors function:

from petastorm.tf_utils import tf_tensors

with make_reader('file:///some/localpath/a_dataset') as reader:
   row_tensors = tf_tensors(reader)
   with tf.Session() as session:
       for _ in range(3):
           print(session.run(row_tensors))

Alternatively, you can use new tf.data.Dataset API;

from petastorm.tf_utils import make_petastorm_dataset

with make_reader('file:///some/localpath/a_dataset') as reader:
    dataset = make_petastorm_dataset(reader)
    iterator = dataset.make_one_shot_iterator()
    tensor = iterator.get_next()
    with tf.Session() as sess:
        sample = sess.run(tensor)
        print(sample.id)

Pytorch API

As illustrated in pytorch_example.py, reading a petastorm dataset from pytorch can be done via the adapter class petastorm.pytorch.DataLoader, which allows custom pytorch collating function and transforms to be supplied.

Be sure you have torch and torchvision installed:

pip install torchvision

The minimalist example below assumes the definition of a Net class and train and test functions, included in pytorch_example:

import torch
from petastorm.pytorch import DataLoader

torch.manual_seed(1)
device = torch.device('cpu')
model = Net().to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.5)

def _transform_row(mnist_row):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])
    return (transform(mnist_row['image']), mnist_row['digit'])


transform = TransformSpec(_transform_row, removed_fields=['idx'])

with DataLoader(make_reader('file:///localpath/mnist/train', num_epochs=10,
                            transform_spec=transform, seed=1, shuffle_rows=True), batch_size=64) as train_loader:
    train(model, device, train_loader, 10, optimizer, 1)
with DataLoader(make_reader('file:///localpath/mnist/test', num_epochs=10,
                            transform_spec=transform), batch_size=1000) as test_loader:
    test(model, device, test_loader)

If you are working with very large batch sizes and do not need support for Decimal/strings we provide a petastorm.pytorch.BatchedDataLoader that can buffer using Torch tensors (cpu or cuda) with a signficantly higher throughput.

If the size of your dataset can fit into system memory, you can use an in-memory version dataloader petastorm.pytorch.InMemBatchedDataLoader. This dataloader only reades the dataset once, and caches data in memory to avoid additional I/O for multiple epochs.

Spark Dataset Converter API

Spark converter API simplifies the data conversion from Spark to TensorFlow or PyTorch. The input Spark DataFrame is first materialized in the parquet format and then loaded as a tf.data.Dataset or torch.utils.data.DataLoader.

The minimalist example below assumes the definition of a compiled tf.keras model and a Spark DataFrame containing a feature column followed by a label column.

from petastorm.spark import SparkDatasetConverter, make_spark_converter
import tensorflow.compat.v1 as tf  # pylint: disable=import-error

# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, 'hdfs:/...')

df = ... # `df` is a spark dataframe

# create a converter from `df`
# it will materialize `df` to cache dir.
converter = make_spark_converter(df)

# make a tensorflow dataset from `converter`
with converter.make_tf_dataset() as dataset:
    # the `dataset` is `tf.data.Dataset` object
    # dataset transformation can be done if needed
    dataset = dataset.map(...)
    # we can train/evaluate model on the `dataset`
    model.fit(dataset)
    # when exiting the context, the reader of the dataset will be closed

# delete the cached files of the dataframe.
converter.delete()

The minimalist example below assumes the definition of a Net class and train and test functions, included in pytorch_example.py, and a Spark DataFrame containing a feature column followed by a label column.

from petastorm.spark import SparkDatasetConverter, make_spark_converter

# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, 'hdfs:/...')

df_train, df_test = ... # `df_train` and `df_test` are spark dataframes
model = Net()

# create a converter_train from `df_train`
# it will materialize `df_train` to cache dir. (the same for df_test)
converter_train = make_spark_converter(df_train)
converter_test = make_spark_converter(df_test)

# make a pytorch dataloader from `converter_train`
with converter_train.make_torch_dataloader() as dataloader_train:
    # the `dataloader_train` is `torch.utils.data.DataLoader` object
    # we can train model using the `dataloader_train`
    train(model, dataloader_train, ...)
    # when exiting the context, the reader of the dataset will be closed

# the same for `converter_test`
with converter_test.make_torch_dataloader() as dataloader_test:
    test(model, dataloader_test, ...)

# delete the cached files of the dataframes.
converter_train.delete()
converter_test.delete()

Analyzing petastorm datasets using PySpark and SQL

A Petastorm dataset can be read into a Spark DataFrame using PySpark, where you can use a wide range of Spark tools to analyze and manipulate the dataset.

# Create a dataframe object from a parquet file
dataframe = spark.read.parquet(dataset_url)

# Show a schema
dataframe.printSchema()

# Count all
dataframe.count()

# Show a single column
dataframe.select('id').show()

SQL can be used to query a Petastorm dataset:

spark.sql(
   'SELECT count(id) '
   'from parquet.`file:///tmp/hello_world_dataset`').collect()

You can find a full code sample here: pyspark_hello_world.py,

Non Petastorm Parquet Stores

Petastorm can also be used to read data directly from Apache Parquet stores. To achieve that, use make_batch_reader (and not make_reader). The following table summarizes the differences make_batch_reader and make_reader functions.

make_reader make_batch_reader

Only Petastorm datasets (created using materializes_dataset)

Any Parquet store (some native Parquet column types are not supported yet.

------------------------------------------------------------------ -----------------------------------------------------

The reader returns one record at a time.

The reader returns batches of records. The size of the batch is not fixed and defined by Parquet row-group size.

------------------------------------------------------------------ -----------------------------------------------------
Predicates passed to make_reader are evaluated per single row. Predicates passed to make_batch_reader are evaluated per batch.
------------------------------------------------------------------ -----------------------------------------------------
Can filter parquet file based on the filters argument. Can filter parquet file based on the filters argument

Troubleshooting

See the Troubleshooting page and please submit a ticket if you can't find an answer.

See also

  1. Gruener, R., Cheng, O., and Litvin, Y. (2018) Introducing Petastorm: Uber ATG's Data Access Library for Deep Learning. URL: https://eng.uber.com/petastorm/
  2. QCon.ai 2019: "Petastorm: A Light-Weight Approach to Building ML Pipelines".

How to Contribute

We prefer to receive contributions in the form of GitHub pull requests. Please send pull requests against the github.com/uber/petastorm repository.

  • If you are looking for some ideas on what to contribute, check out github issues and comment on the issue.
  • If you have an idea for an improvement, or you'd like to report a bug but don't have time to fix it please a create a github issue.

To contribute a patch:

  • Break your work into small, single-purpose patches if possible. It's much harder to merge in a large change with a lot of disjoint features.
  • Submit the patch as a GitHub pull request against the master branch. For a tutorial, see the GitHub guides on forking a repo and sending a pull request.
  • Include a detailed describtion of the proposed change in the pull request.
  • Make sure that your code passes the unit tests. You can find instructions how to run the unit tests here.
  • Add new unit tests for your code.

Thank you in advance for your contributions!

See the Development for development related information.

petastorm's People

Contributors

abditag2 avatar acmore avatar chongxiaoc avatar dmcguire81 avatar gregaru avatar gregw18 avatar gueguenster avatar ingolfured avatar ivan-dimitrov avatar jakelarkn avatar jgblight avatar juandavi1 avatar liangz1 avatar limmen avatar manjuransari avatar megaserg avatar mostafafarahani avatar mtn avatar praateekmahajan avatar rb-determined-ai avatar rbetz avatar rgruener avatar ritwikbera avatar s-udhaya avatar selitvin avatar tgaddair avatar tirkarthi avatar vivekpanyam avatar weichenxu123 avatar xiaohanhuang avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

petastorm's Issues

Reader v2 implementation

This implementation gets rid of the custom workers_pool and uses standard python concurrency.futures. It also supports shuffling compressed data.

Chase down haphazard core dump when running mnist example main

When running the mnist main, about 2 out of every 3 run fails with a core dump, typically during the Reader open phase (before training begins). Once in my latest run, the seg fault occurs at the end of the first train epoch, but before the first test batch, so it's very likely still during Reader construction.

The core dump occurs in data page release within pyarrow libparquet:

#0  0x00007f1b48f9db48 in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() () from /usr/local/lib/python2.7/dist-packages/pyarrow/libparquet.so.1
#1  0x00007f1b48febe73 in std::_Sp_counted_ptr_inplace<parquet::DataPage, std::allocator<parquet::DataPage>, (__gnu_cxx::_Lock_policy)2>::_M_dispose() ()
   from /usr/local/lib/python2.7/dist-packages/pyarrow/libparquet.so.1
#2  0x00007f1b48f9db29 in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() () from /usr/local/lib/python2.7/dist-packages/pyarrow/libparquet.so.1
#3  0x00007f1b48fc520f in parquet::internal::TypedRecordReader<parquet::DataType<(parquet::Type::type)1> >::ReadNewPage() ()
   from /usr/local/lib/python2.7/dist-packages/pyarrow/libparquet.so.1
#4  0x00007f1b48fc5ba0 in parquet::internal::TypedRecordReader<parquet::DataType<(parquet::Type::type)1> >::ReadRecords(long) ()
   from /usr/local/lib/python2.7/dist-packages/pyarrow/libparquet.so.1
#5  0x00007f1b48f9b676 in parquet::arrow::PrimitiveImpl::NextBatch(long, std::shared_ptr<arrow::Array>*) () from /usr/local/lib/python2.7/dist-packages/pyarrow/libparquet.so.1
#6  0x00007f1b48f96fae in parquet::arrow::ColumnReader::NextBatch(long, std::shared_ptr<arrow::Array>*) () from /usr/local/lib/python2.7/dist-packages/pyarrow/libparquet.so.1
#7  0x00007f1b48f97bcb in parquet::arrow::FileReader::Impl::ReadColumnChunk(int, int, std::shared_ptr<arrow::Array>*) ()
   from /usr/local/lib/python2.7/dist-packages/pyarrow/libparquet.so.1
#8  0x00007f1b48f9810f in parquet::arrow::FileReader::Impl::ReadRowGroup(int, std::vector<int, std::allocator<int> > const&, std::shared_ptr<arrow::Table>*)::{lambda(int)#1}::operator()(int) const () from /usr/local/lib/python2.7/dist-packages/pyarrow/libparquet.so.1
#9  0x00007f1b48f98f20 in parquet::arrow::FileReader::Impl::ReadRowGroup(int, std::vector<int, std::allocator<int> > const&, std::shared_ptr<arrow::Table>*) ()
   from /usr/local/lib/python2.7/dist-packages/pyarrow/libparquet.so.1
#10 0x00007f1b48f994c2 in parquet::arrow::FileReader::ReadRowGroup(int, std::vector<int, std::allocator<int> > const&, std::shared_ptr<arrow::Table>*) ()
   from /usr/local/lib/python2.7/dist-packages/pyarrow/libparquet.so.1
#11 0x00007f1b47de8cc7 in __pyx_pw_7pyarrow_8_parquet_13ParquetReader_7read_row_group(_object*, _object*, _object*) ()
   from /usr/local/lib/python2.7/dist-packages/pyarrow/_parquet.so
#12 0x00000000004cdea9 in do_call (nk=<optimized out>, na=<optimized out>, pp_stack=0x7f1b1493ea70, 
    func=<built-in method read_row_group of pyarrow._parquet.ParquetReader object at remote 0x7f1b484fcb50>) at ../Python/ceval.c:4235
#13 call_function (oparg=<optimized out>, pp_stack=0x7f1b1493ea70) at ../Python/ceval.c:4043
#14 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x7f1b45f92250, for file /usr/local/lib/python2.7/dist-packages/pyarrow/parquet.py, line 125, in read_row_group (self=<ParquetFile(common_metadata=None, _nested_paths_by_prefix={'digit': [0], 'image': [2], 'idx': [1]}, reader=<pyarrow._parquet.ParquetReader at remote 0x7f1b484fcb50>) at remote 0x7f1b45f7c990>, i=0, columns=set(['digit', 'image', 'idx']), nthreads=1, use_pandas_metadata=False, column_indices=[0, 2, 1]), throwflag=throwflag@entry=0) at ../Python/ceval.c:2666
#15 0x00000000004704ea in PyEval_EvalCodeEx (closure=<optimized out>, defcount=<optimized out>, defs=0x7f1b4858fd88, kwcount=<optimized out>, kws=<optimized out>, 
    argcount=<optimized out>, args=<optimized out>, locals=0x0, globals=<optimized out>, co=<optimized out>) at ../Python/ceval.c:3252
#16 function_call.15337 (func=<optimized out>, arg=<optimized out>, kw=<optimized out>) at ../Objects/funcobject.c:526
#17 0x00000000004c9aa5 in PyObject_Call (kw={'use_pandas_metadata': False, 'nthreads': 1, 'columns': set(['digit', 'image', 'idx'])}, 
    arg=(<ParquetFile(common_metadata=None, _nested_paths_by_prefix={'digit': [0], 'image': [2], 'idx': [1]}, reader=<pyarrow._parquet.ParquetReader at remote 0x7f1b484fcb50>) at remote 0x7f1b45f7c990>, 0), func=<function at remote 0x7f1b485948c0>) at ../Objects/abstract.c:2529
#18 ext_do_call (nk=<optimized out>, na=<optimized out>, flags=<optimized out>, pp_stack=0x7f1b1493ecb0, func=<function at remote 0x7f1b485948c0>) at ../Python/ceval.c:4333
#19 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x7f1b10007080, for file /usr/local/lib/python2.7/dist-packages/pyarrow/parquet.py, line 459, in read (self=<ParquetDatasetPiece(path='/home/ocheng/dev/datasets/mnist/test/part-00000-c04ef970-dd95-44b3-8ca6-0f6d4cbf321e-c000.parquet', partition_keys=[], row_group=0) at remote 0x7f1b0f71e190>, columns=set(['digit', 'image', 'idx']), nthreads=1, partitions=<ParquetPartitions(levels=[], partition_names=set([])) at remote 0x7f1b0f71e090>, open_file_func=<function at remote 0x7f1b45f77ed8>, file=None, use_pandas_metadata=False, reader=<ParquetFile(common_metadata=None, _nested_paths_by_prefix={'digit': [0], 'image': [2], 'idx': [1]}, reader=<pyarrow._parquet.ParquetReader at remote 0x7f1b484fcb50>) at remote 0x7f1b45f7c990>, options={'use_pandas_metadata': False, 'nthreads': 1, 'columns': set(...)}), throwflag=throwflag@entry=0) at ../Python/ceval.c:2705
#20 0x00000000004cfedc in PyEval_EvalCodeEx (co=0x7f1b48575030, globals=<optimized out>, locals=<optimized out>, args=<optimized out>, argcount=<optimized out>, 
    kws=<optimized out>, kwcount=3, defs=0x7f1b4857ec98, defcount=6, closure=0x0) at ../Python/ceval.c:3252
#21 0x00000000004c8314 in fast_function (nk=<optimized out>, na=1, n=<optimized out>, pp_stack=0x7f1b1493eed0, func=<function at remote 0x7f1b48599320>) at ../Python/ceval.c:4116
#22 call_function (oparg=<optimized out>, pp_stack=0x7f1b1493eed0) at ../Python/ceval.c:4041
#23 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x7f1b10006e30, for file /home/ocheng/dev/petastorm/petastorm/reader_worker.py, line 191, in _read_with_shuffle_row_drop (self=<ReaderWorker(_split_pieces=[<ParquetDatasetPiece(path='/home/ocheng/dev/datasets/mnist/test/part-00000-c04ef970-dd95-44b3-8ca6-0f6d4cbf321e-c000.parquet', partition_keys=[], row_group=0) at remote 0x7f1b0f71e190>], publish_func=<function at remote 0x7f1b45f77c08>, _sequence=None, args=('file:///home/ocheng/dev/datasets/mnist/test', <Unischema(digit=<UnischemaField at remote 0x7f1b45fb0e88>, _namedtuple=None, idx=<UnischemaField at remote 0x7f1b45fb0ef0>, image=<UnischemaField at remote 0x7f1b45fb0f58>, _name='MnistSchema', _fields=<OrderedDict(_OrderedDict__root=[[[[[...], [...], 'digit'], [...], 'idx'], [...], 'image'], [...], None], _OrderedDict__map={'digit': [...], 'image': [...], 'idx': [...]}) at remote 0x7f1b46353ab8>) at remote 0x7f1b45f7cd10>, None, [...], <NullCache at remote 0x7f1b45f7c950>, None), _schema=<...>, _dataset=<ParquetDataset(paths='/home/ocheng/dev/datas...(truncated), 
    throwflag=throwflag@entry=0) at ../Python/ceval.c:2666
#24 0x00000000004cfedc in PyEval_EvalCodeEx (co=0x7f1b48543830, globals=<optimized out>, locals=<optimized out>, args=<optimized out>, argcount=<optimized out>, 
    kws=<optimized out>, kwcount=0, defs=0x0, defcount=0, closure=0x0) at ../Python/ceval.c:3252
---Type <return> to continue, or q <return> to quit---
#25 0x00000000004c9419 in fast_function (nk=<optimized out>, na=5, n=5, pp_stack=0x7f1b1493f0f0, func=<function at remote 0x7f1b4855c410>) at ../Python/ceval.c:4116
#26 call_function (oparg=<optimized out>, pp_stack=0x7f1b1493f0f0) at ../Python/ceval.c:4041
#27 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x7f1b10006c20, for file /home/ocheng/dev/petastorm/petastorm/reader_worker.py, line 122, in _load_rows (self=<ReaderWorker(_split_pieces=[<ParquetDatasetPiece(path='/home/ocheng/dev/datasets/mnist/test/part-00000-c04ef970-dd95-44b3-8ca6-0f6d4cbf321e-c000.parquet', partition_keys=[], row_group=0) at remote 0x7f1b0f71e190>], publish_func=<function at remote 0x7f1b45f77c08>, _sequence=None, args=('file:///home/ocheng/dev/datasets/mnist/test', <Unischema(digit=<UnischemaField at remote 0x7f1b45fb0e88>, _namedtuple=None, idx=<UnischemaField at remote 0x7f1b45fb0ef0>, image=<UnischemaField at remote 0x7f1b45fb0f58>, _name='MnistSchema', _fields=<OrderedDict(_OrderedDict__root=[[[[[...], [...], 'digit'], [...], 'idx'], [...], 'image'], [...], None], _OrderedDict__map={'digit': [...], 'image': [...], 'idx': [...]}) at remote 0x7f1b46353ab8>) at remote 0x7f1b45f7cd10>, None, [...], <NullCache at remote 0x7f1b45f7c950>, None), _schema=<...>, _dataset=<ParquetDataset(paths='/home/ocheng/dev/datasets/mnist/test', ...(truncated), 
    throwflag=throwflag@entry=0) at ../Python/ceval.c:2666
#28 0x00000000004c8762 in fast_function (nk=<optimized out>, na=<optimized out>, n=4, pp_stack=0x7f1b1493f270, func=<function at remote 0x7f1b4855c320>) at ../Python/ceval.c:4106
#29 call_function (oparg=<optimized out>, pp_stack=0x7f1b1493f270) at ../Python/ceval.c:4041
#30 PyEval_EvalFrameEx (f=f@entry=Frame 0x7f1b45fb89b0, for file /home/ocheng/dev/petastorm/petastorm/reader_worker.py, line 104, in <lambda> (), throwflag=throwflag@entry=0)
    at ../Python/ceval.c:2666
#31 0x00000000004cfedc in PyEval_EvalCodeEx (co=0x7f1b4853e530, globals=<optimized out>, locals=<optimized out>, args=<optimized out>, argcount=<optimized out>, 
    kws=<optimized out>, kwcount=0, defs=0x0, defcount=0, 
    closure=(<cell at remote 0x7f1b45f74b08>, <cell at remote 0x7f1b45f74948>, <cell at remote 0x7f1b45f74bb0>, <cell at remote 0x7f1b45f74a28>)) at ../Python/ceval.c:3252
#32 0x00000000004c9419 in fast_function (nk=<optimized out>, na=0, n=0, pp_stack=0x7f1b1493f490, func=<function at remote 0x7f1b45f77a28>) at ../Python/ceval.c:4116
#33 call_function (oparg=<optimized out>, pp_stack=0x7f1b1493f490) at ../Python/ceval.c:4041
#34 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x7f1b45f9ce50, for file /home/ocheng/dev/petastorm/petastorm/cache.py, line 36, in get (self=<NullCache at remote 0x7f1b45f7c950>, key='d6aff736faf6f8954553f8bed1c01cf1:/home/ocheng/dev/datasets/mnist/test/part-00000-c04ef970-dd95-44b3-8ca6-0f6d4cbf321e-c000.parquet:0', fill_cache_func=<function at remote 0x7f1b45f77a28>), 
    throwflag=throwflag@entry=0) at ../Python/ceval.c:2666
#35 0x00000000004c8762 in fast_function (nk=<optimized out>, na=<optimized out>, n=3, pp_stack=0x7f1b1493f610, func=<function at remote 0x7f1b4859f5f0>) at ../Python/ceval.c:4106
#36 call_function (oparg=<optimized out>, pp_stack=0x7f1b1493f610) at ../Python/ceval.c:4041
#37 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x7f1b10000dc0, for file /home/ocheng/dev/petastorm/petastorm/reader_worker.py, line 104, in process (self=<ReaderWorker(_split_pieces=[<ParquetDatasetPiece(path='/home/ocheng/dev/datasets/mnist/test/part-00000-c04ef970-dd95-44b3-8ca6-0f6d4cbf321e-c000.parquet', partition_keys=[], row_group=0) at remote 0x7f1b0f71e190>], publish_func=<function at remote 0x7f1b45f77c08>, _sequence=None, args=('file:///home/ocheng/dev/datasets/mnist/test', <Unischema(digit=<UnischemaField at remote 0x7f1b45fb0e88>, _namedtuple=None, idx=<UnischemaField at remote 0x7f1b45fb0ef0>, image=<UnischemaField at remote 0x7f1b45fb0f58>, _name='MnistSchema', _fields=<OrderedDict(_OrderedDict__root=[[[[[...], [...], 'digit'], [...], 'idx'], [...], 'image'], [...], None], _OrderedDict__map={'digit': [...], 'image': [...], 'idx': [...]}) at remote 0x7f1b46353ab8>) at remote 0x7f1b45f7cd10>, None, [...], <NullCache at remote 0x7f1b45f7c950>, None), _schema=<...>, _dataset=<ParquetDataset(paths='/home/ocheng/dev/datasets/mnist/test', com...(truncated), 
    throwflag=throwflag@entry=0) at ../Python/ceval.c:2666
#38 0x00000000004704ea in PyEval_EvalCodeEx (closure=<optimized out>, defcount=<optimized out>, defs=0x7f1b10000f38, kwcount=<optimized out>, kws=<optimized out>, 
    argcount=<optimized out>, args=<optimized out>, locals=0x0, globals=<optimized out>, co=<optimized out>) at ../Python/ceval.c:3252
#39 function_call.15337 (func=<optimized out>, arg=<optimized out>, kw=<optimized out>) at ../Objects/funcobject.c:526
#40 0x00000000004c9aa5 in PyObject_Call (kw={'worker_predicate': None, 'shuffle_row_drop_partition': (0, 1), 'piece_index': 0}, 
    arg=(<ReaderWorker(_split_pieces=[<ParquetDatasetPiece(path='/home/ocheng/dev/datasets/mnist/test/part-00000-c04ef970-dd95-44b3-8ca6-0f6d4cbf321e-c000.parquet', partition_keys=[], row_group=0) at remote 0x7f1b0f71e190>], publish_func=<function at remote 0x7f1b45f77c08>, _sequence=None, args=('file:///home/ocheng/dev/datasets/mnist/test', <Unischema(digit=<UnischemaField at remote 0x7f1b45fb0e88>, _namedtuple=None, idx=<UnischemaField at remote 0x7f1b45fb0ef0>, image=<UnischemaField at remote 0x7f1b45fb0f58>, _name='MnistSchema', _fields=<OrderedDict(_OrderedDict__root=[[[[[...], [...], 'digit'], [...], 'idx'], [...], 'image'], [...], None], _OrderedDict__map={'digit': [...], 'image': [...], 'idx': [...]}) at remote 0x7f1b46353ab8>) at remote 0x7f1b45f7cd10>, None, [...], <NullCache at remote 0x7f1b45f7c950>, None), _schema=<...>, _dataset=<ParquetDataset(paths='/home/ocheng/dev/datasets/mnist/test', common_metadata=<pyarrow._parquet.FileMetaData at remote 0x7f1b484fcaf8>, fs=<LocalFileSystem at remote 0x7f1b4a066...(truncated), 
    func=<function at remote 0x7f1b4855c2a8>) at ../Objects/abstract.c:2529
#41 ext_do_call (nk=<optimized out>, na=<optimized out>, flags=<optimized out>, pp_stack=0x7f1b1493f850, func=<function at remote 0x7f1b4855c2a8>) at ../Python/ceval.c:4333
#42 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x7f1b4613a810, for file /home/ocheng/dev/petastorm/petastorm/workers_pool/thread_pool.py, line 60, in run (self=<WorkerThread(_ventilator_queue=<Queue(unfinished_tasks=1, queue=<collections.deque at remote 0x7f1b45faa980>, maxsize=0, all_tasks_done=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x7f1b45f75510>, acquire=<built-in method acquire of thread.lock object at remote 0x7f1b45f75510>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x7f1b45f75510>) at remote 0x7f1b0f71e290>, mutex=<thread.lock at remote 0x7f1b45f75510>, not_full=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x7f1b45f75510>, acquire=<built-in method acquire of thread.lock object at remote 0x7f1b45f75510>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x7f1b45f75510>) at remote 0x7f1b0f71e250>, not_empty=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x7f1b45...(truncated), 
    throwflag=throwflag@entry=0) at ../Python/ceval.c:2705
#43 0x00000000004c8762 in fast_function (nk=<optimized out>, na=<optimized out>, n=1, pp_stack=0x7f1b1493f9d0, func=<function at remote 0x7f1b484f2c80>) at ../Python/ceval.c:4106
---Type <return> to continue, or q <return> to quit---
#44 call_function (oparg=<optimized out>, pp_stack=0x7f1b1493f9d0) at ../Python/ceval.c:4041
#45 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x7f1b08000910, for file /usr/lib/python2.7/threading.py, line 810, in __bootstrap_inner (self=<WorkerThread(_ventilator_queue=<Queue(unfinished_tasks=1, queue=<collections.deque at remote 0x7f1b45faa980>, maxsize=0, all_tasks_done=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x7f1b45f75510>, acquire=<built-in method acquire of thread.lock object at remote 0x7f1b45f75510>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x7f1b45f75510>) at remote 0x7f1b0f71e290>, mutex=<thread.lock at remote 0x7f1b45f75510>, not_full=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x7f1b45f75510>, acquire=<built-in method acquire of thread.lock object at remote 0x7f1b45f75510>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x7f1b45f75510>) at remote 0x7f1b0f71e250>, not_empty=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x7f1b45f75510>, acquire=<...(truncated), 
    throwflag=throwflag@entry=0) at ../Python/ceval.c:2666
#46 0x00000000004c8762 in fast_function (nk=<optimized out>, na=<optimized out>, n=1, pp_stack=0x7f1b1493fb50, func=<function at remote 0x7f1ba6334398>) at ../Python/ceval.c:4106
#47 call_function (oparg=<optimized out>, pp_stack=0x7f1b1493fb50) at ../Python/ceval.c:4041
#48 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x7f1b45f9cad0, for file /usr/lib/python2.7/threading.py, line 783, in __bootstrap (self=<WorkerThread(_ventilator_queue=<Queue(unfinished_tasks=1, queue=<collections.deque at remote 0x7f1b45faa980>, maxsize=0, all_tasks_done=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x7f1b45f75510>, acquire=<built-in method acquire of thread.lock object at remote 0x7f1b45f75510>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x7f1b45f75510>) at remote 0x7f1b0f71e290>, mutex=<thread.lock at remote 0x7f1b45f75510>, not_full=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x7f1b45f75510>, acquire=<built-in method acquire of thread.lock object at remote 0x7f1b45f75510>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x7f1b45f75510>) at remote 0x7f1b0f71e250>, not_empty=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x7f1b45f75510>, acquire=<built-...(truncated), 
    throwflag=throwflag@entry=0) at ../Python/ceval.c:2666
#49 0x00000000004704ea in PyEval_EvalCodeEx (closure=<optimized out>, defcount=<optimized out>, defs=0x0, kwcount=<optimized out>, kws=<optimized out>, argcount=<optimized out>, 
    args=<optimized out>, locals=0x0, globals=<optimized out>, co=<optimized out>) at ../Python/ceval.c:3252
#50 function_call.15337 (func=func@entry=<function at remote 0x7f1ba63342a8>, 
    arg=arg@entry=(<WorkerThread(_ventilator_queue=<Queue(unfinished_tasks=1, queue=<collections.deque at remote 0x7f1b45faa980>, maxsize=0, all_tasks_done=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x7f1b45f75510>, acquire=<built-in method acquire of thread.lock object at remote 0x7f1b45f75510>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x7f1b45f75510>) at remote 0x7f1b0f71e290>, mutex=<thread.lock at remote 0x7f1b45f75510>, not_full=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x7f1b45f75510>, acquire=<built-in method acquire of thread.lock object at remote 0x7f1b45f75510>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x7f1b45f75510>) at remote 0x7f1b0f71e250>, not_empty=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x7f1b45f75510>, acquire=<built-in method acquire of thread.lock object at remote 0x7f1b45f75510>, _Condition__waiters=[], rel...(truncated), kw=kw@entry=0x0)
    at ../Objects/funcobject.c:526
#51 0x00000000004d8194 in PyObject_Call (kw=0x0, 
    arg=(<WorkerThread(_ventilator_queue=<Queue(unfinished_tasks=1, queue=<collections.deque at remote 0x7f1b45faa980>, maxsize=0, all_tasks_done=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x7f1b45f75510>, acquire=<built-in method acquire of thread.lock object at remote 0x7f1b45f75510>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x7f1b45f75510>) at remote 0x7f1b0f71e290>, mutex=<thread.lock at remote 0x7f1b45f75510>, not_full=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x7f1b45f75510>, acquire=<built-in method acquire of thread.lock object at remote 0x7f1b45f75510>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x7f1b45f75510>) at remote 0x7f1b0f71e250>, not_empty=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x7f1b45f75510>, acquire=<built-in method acquire of thread.lock object at remote 0x7f1b45f75510>, _Condition__waiters=[], rel...(truncated), 
    func=<function at remote 0x7f1ba63342a8>) at ../Objects/abstract.c:2529
#52 instancemethod_call.8802 (func=<function at remote 0x7f1ba63342a8>, func@entry=<instancemethod at remote 0x7f1b45f7b190>, 
    arg=(<WorkerThread(_ventilator_queue=<Queue(unfinished_tasks=1, queue=<collections.deque at remote 0x7f1b45faa980>, maxsize=0, all_tasks_done=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x7f1b45f75510>, acquire=<built-in method acquire of thread.lock object at remote 0x7f1b45f75510>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x7f1b45f75510>) at remote 0x7f1b0f71e290>, mutex=<thread.lock at remote 0x7f1b45f75510>, not_full=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x7f1b45f75510>, acquire=<built-in method acquire of thread.lock object at remote 0x7f1b45f75510>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x7f1b45f75510>) at remote 0x7f1b0f71e250>, not_empty=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x7f1b45f75510>, acquire=<built-in method acquire of thread.lock object at remote 0x7f1b45f75510>, _Condition__waiters=[], rel...(truncated), arg@entry=(), kw=0x0)
    at ../Objects/classobject.c:2602
#53 0x00000000004d40fb in PyObject_Call (kw=<optimized out>, arg=(), func=<instancemethod at remote 0x7f1b45f7b190>) at ../Objects/abstract.c:2529
#54 PyEval_CallObjectWithKeywords (func=<instancemethod at remote 0x7f1b45f7b190>, arg=(), kw=<optimized out>) at ../Python/ceval.c:3889
#55 0x000000000057f3a2 in t_bootstrap.71638 (boot_raw=0x462e8b0) at ../Modules/threadmodule.c:614
#56 0x00007f1bbd621184 in start_thread (arg=0x7f1b14940700) at pthread_create.c:312
#57 0x00007f1bbd34e37d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:111

Unify namedtuple/dict way samples are treated in Reader implementation

Currently we use use both namedtuple and dictionary interchangeably. This results in a lot of back and forth conversions and makes the code more cumbersome.

Since nametuple is kinda syntactic suger for Reader users, we should make all internal usage of rows to be a dictionary (since it is easier for code parametrized by field names to handle).

Investigate x2 drop in throughput when reading data via `tf_tensors`

Throughput does not scale that well when reading via tf_tensors instead of a pure python next(reader). I don't think it was always an issue. Here an example of a measurement results using petastorm-throughput.py utility:


# cpus | CPU usage | RAM (Mb) | full image 1920*1200/sec | petastorm_backend
-- | -- | -- | -- | --
3 | 298% | 3000 | 23.82 | numpy
4 | 402% | 4205 | 36.54 | numpy
5 | 510% | 4966 | 43.9 | numpy
6 | 605% | 5314 | 55.54 | numpy
7 | 717% | 6166 | 71.84 | numpy
8 | 812% | 7380 | 79.05 | numpy
9 | 922% | 8548 | 81.08 | numpy
10 | 1008% | 8607 | 89.73 | numpy
  |   |   |   |  
3 | 370% | 6354 | 26.44 | tf
4 | 474% | 7597 | 31.96 | tf
5 | 588% | 10802 | 38.42 | tf
6 | 700% | 11360 | 39.21 | tf
7 | 784% | 14538 | 40.46 | tf
8 | 893% | 15953 | 41.95 | tf
9 | 1004% | 18438.7 | 39.97 | tf
10 | NA | NA | NA | tf

Querying a single column with a predicate on the same column will not return any results

The following pseudo code would fail:

reader = Reader([SomeSchema.a], predicate=in_lambda([SomeSchema.a], lambda a: True),...)
reader.fetch()

Will never find results.

I think the reason is due to this piece of code in the Reader.py:

# Read remaining columns
other_rows = piece.read(
    open_file_func=lambda filepath: file,
    columns=other_column_names,
    partitions=self._dataset.partitions).to_pandas().to_dict('records')

# Remove rows that were filtered out by the predicate
filtered_other_rows = [row for i, row in enumerate(other_rows) if match_predicate_mask[i]]

# Decode remaining columns
decoded_other_rows = [_decode_row(row, self._schema) for row in filtered_other_rows]

# Merge predicate needed columns with the remaining
all_cols = [_merge_two_dicts(a, b) for a, b in zip(decoded_other_rows, filtered_decoded_predicate_rows)]

other_rows will be [] since no columns were loaded. This will result in zip(decoded_other_rows, filtered_decoded_predicate_rows) returning an empty list [] even filtered_decoded_predicate_rows could be an array of [True, True,....]

tf_tensors would fail when reading a dataset with non int32 fields and ProcessPool(pyarrow_serialize=True) implementation is used

ProcessPool uses pyarrow.serialize. However, it would map all integer fields to int32 type. Tensorflow requires exact types to be returned by a pyfunc, hence it will fail.

We did not see this issue since we are not covering the combination of ProcessPool and tf_tensors in test_tf_utils.py.

ProcessPool is going to be replaced when we fully adapt Reader v2, so probably do not have to fix the issue. Opening this issue for awareness.

A list of column names passed to in_lambda has to be a string, which is confusing

For example, today this is right:

in_lambda([SomeSchema.field.name], lambda x: ...)

But this would fail silently:

in_lambda([SomeSchema.field], lambda x: ...)

I think we should support the second and maybe give a 'deprecated' warning if someone uses the first approach so we don't break other people's code. We should clean the deprecated way later.

Flakey Travis CI execution of Reader tests using ThreadPool

A couple examples of such run are:

petastorm/tests/test_end_to_end.py::test_simple_read[reader_factory1] /home/travis/.travis/job_stages: line 78:  2987 Segmentation fault      (core dumped) pytest -v --ignore=petastorm/tests/test_pytorch_utils.py --ignore=examples/mnist/tests/test_generate_mnist_dataset.py --cov=./

Benchmarking tools

Create a set of benchmarking tools:

  • Command line utility to measure samples/sec read rate
  • Support creating notebooks with performance measurements.

Support `tf.data.Dataset` interface

Modern TF code uses tf.data.Dataset object to read and preprocess data. Today we output raw tensors tf_tensor. Returning tf.data.Dataset makes the reader more user friendly.

(Some code already exists internally. TODO: bring it in and polish)

Unnecessary dependency on OpenCV

I'm trying to export a non-image spark dataset to the petastorm format, and when I import the petastorm codecs, I get this:

~/.conda/envs/stb2/lib/python3.5/site-packages/petastorm/codecs.py in <module>()
     25 from io import BytesIO
     26 
---> 27 import cv2
     28 import numpy as np
     29 from pyspark.sql.types import BinaryType, LongType, IntegerType, ShortType, ByteType, StringType

ImportError: No module named 'cv2'

It doesn't make sense to keep opencv as a dependency, it seems unnecessary.

Use `pyarrow.filesystem.LocalFileSystem`'s `rm` implementation when will become available

add_to_dataset_metadata deletes ._common_metadata.crc file to avoid mismatch between _common_metadata content and its .crc (since add_to_dataset_metadata modifies _common_metadata.

The .crc file is deleted using os.remove since pyarrow.filesystem.LocalFileSystem has rm unimplemented.

Once the implementation is inplace on the pyarrow side (we may consider contributing the fix), we should remove our os.remove hack.

Implement columns bundling

Helps reduce the amount of columns in our stores as we encounter some Parquet implementation limitations.

Add `progress` property to the reader

In some scenarios, we want to know where we are in the terms of epoch(s) progress.

Let's add epoch_progress property to the Reader. The value of epoch_progress would be a real number indicating the epoch progress. For example, epoch_progress==2.33 would mean that we are 33% into the third epoch.

The calculation can not be exact on the row level, but we can make an estimate on a row-group level.

create_schema_view fails if both Unischema and UnischemaField were previously pickled

The follow snippet shows that in the presense of pyspark's version of cloudpickle, pickling of an object derived from namedtuple will loose all its attributes.

import pickle
from collections import namedtuple

from pyspark import cloudpickle   # << Commenting this out fixes the issue

class MyStruct(namedtuple('MyStruct', ['name'])):

    def foo(self):
        print 'foo'
        return 1

# foo works before pickling
a = MyStruct('a')
a.foo()

repickled_a = pickle.loads(pickle.dumps(a))
# foo attribute is missing after depickling
repickled_a.foo()

In our case, if UnischemaField and a UnischemaSchema (with UnischemaFields) were pickled, Unischema.create_schema_view will fail because custom UnischemaField__eq__ is lost.

Some ideas:

  • Don't need to have UnischemaField as an object inherited from namedtuple. Would need to think about migrating data from existing datasets.
  • Go even further and use prototex or other format to store Unischema without code. Also requires migration.

Guarantee correct order of initialization of LocalFilesystemCache and ProcessExecutorPool

Today, a cache object is initialized and passed to a Reader. This is not safe for the following reason:
A ProcessPoolExecutor is started after the cache is instantiated and has opened sqlite3 connection. Forking a process with open sqlite3 connections is not safe and discouraged by the documentation. We see that the forked process might get stuck on some sqlite3 mutex.
This results in our test suite hanging and not able to finish. Of course, this is also a real problem from a user perspective.

Ideas:

  • We plan the following two changes:
  1. Deprecate Reader v1
  2. Create a 'factory' layer that would be part of Petastorm and would be responsbile for creation of Reader and its components.

Once this happens, we would be able to control the order of ProcessPoolExecutor and Cache initialization, hence guarantee that the order is correct. Together with documenting this possible failure if a user decides not to use Petastorm supplied factory, the solution could be sufficient.

Define field sunset mechanism

When we want to retire a field we want to do it in stages. At the beginning a user should get a warning that the field is going to be retired only then it should disappear (maybe with an explicit message that the field was retired + some info for a user).

Implement a tensorflow-native interface to the petastorm reader

In its current state, the pattern for reading data into Tensoflow:

reader = Reader(...)

readout_examples = reader.tf_tensors()

with tf.Session() as sess:
  ...
reader.stop()
reader.join()

Can we make the usage pattern to have more Tensorflow native flavor?

readout_examples = dataset_toolkit_reader(...)

with tf.Session() as sess:
  ...

To do so:

Thread termination should be controlled by tf.train.Coordinator
Question? Who should be allocating/deallocating Reader object and when?

Investigate a better way to handle numpy incompatibility

pyarrow is compiled against a newer incompatible version of numpy than we use internally. This makes installing dependencies a bit trickier and for now we are just upgrading the numpy version in the travis ci script. There is likely a better way to handle this, either by installing pyarrow from source or upgrading the numpy version we depend on.

Support running custom python code on in a decoding thread/process

It can be useful to give a user an option to run some custom python implemented transform on decoding thread, right after our decoder. This can be useful in the following scenarios:

  • Tensorflow does not provide a good way to scale Python preprocessing (since py_func will run entirely under GIL). This could be a good place for Tensorflow developers to place such preprocessing code.
  • PyTorch's DataLoader supports running custom code in subprocesses. We don't have this functionality in our implementation of the DataLoader.

An idea of an API:

Reader(..., postprocess_fn=lambda row: f(row))

A race condition results in get_results never returning

To reproduce:
add a sleep(2) in ventilator.py

            item_to_ventilate = self._items_to_ventilate[self._current_item_to_ventilate]
            sleep(2)
            self._ventilate_fn(**item_to_ventilate)
            self._current_item_to_ventilate += 1
            self._ventilated_items_count += 1

and run petastorm/workers_pool/tests/test_ventilator.py. The test will fail.

A *pool mechanism can not differentiate between "we processed all items" and "no items were ever ventilated".

Not sure we need to fix due to transition to Reader v2 that uses standard Python pool executors.
tagging: @rgruener

Make hdfs tests safe to run in parallel

The following tests fail when running in parallel since rely on global state (os.environ):
EnvBasedHadoopConfigurationTest.test_env_hadoop_prefix_only
EnvBasedHadoopConfigurationTest.test_unmatched_env_var
EnvBasedHadoopConfigurationTest.test_env_hadoop_install_only
and possibly others. To reproduce:

pip install pytest-xdist pytest-repeat
pytest -n 10 --count=10 petastorm/hdfs/ 

Reader takes a long time to exit

I just converted a part of my spark dataset with the petastorm utilities, and now I'm trying to print the results of a single batch.

Each row of the dataset has 2 columns, each one with a (1, 24) shaped np.ndarray

I'm using this code to print a single row:

    with Reader(output_url) as reader:
        for i, row in enumerate(reader):
            print(row)
            break

I takes a decently big amount of time to print the row, and then it takes even more time to exit the program, which is super weird.

Am I doing anything wrong?

Reduce code complexity of worker_loop function

Currently, flake8 fails on worker_loop being too complex function. Should evaluate if the function can be refactored into smaller pieces.
The following check is currently failing:

flake8 . --count --max-complexity=10 --statistics

Enable the check as a blocking once the code is refactor and the test passes.

Create a development.md page

  • Document development environment setup producedures
  • Using --cache-synthetic-dataset
  • Different pytest goodies: pytest-xdist, pytest-repeat, pytest-charm

Improve static code analysis

We should consider adding more static tests to Travis-CI build. This PR #174 shows that we are currently missing some checks that could benefit us.

A reader predicate that references both partitioned and non-partitioned keys will fail

For example the following test will fail (test_end_to_end)

def test_predicate_on_multiple_fields(self):
    expected_values = {'id': 0, 'id2': 0, 'partition': 'p_0'}
    reader = Reader(dataset_url=EndToEndDatasetToolkitTest._dataset_dir, shuffle=False,
                    reader_pool=DummyPool(),
                    predicate=EqualPredicate(expected_values))

    actual = reader.fetch(timeout_s=2)
    self.assertEqual(actual.id, expected_values['id'])
    self.assertEqual(actual.id2, expected_values['id2'])

    reader.stop()
    reader.join()

Using `six` before it is installed from `setup.py`

setup.py would import petastorm to get the __version__
However, petastorm/__init__.py has import six, which would fail.

To reproduce:

$ virtualenv .env --no-site-packages && source .env/bin/activate && pip install -e .
New python executable in .env/bin/python
Installing setuptools, pip...done.
Obtaining file:///home/yevgeni/petastorm
  Running setup.py (path:/home/yevgeni/petastorm/setup.py) egg_info for package from file:///home/yevgeni/petastorm
    Traceback (most recent call last):
      File "<string>", line 17, in <module>
      File "/home/yevgeni/petastorm/setup.py", line 18, in <module>
        from petastorm import __version__
      File "petastorm/__init__.py", line 16, in <module>
        import six
    ImportError: No module named six
    Complete output from command python setup.py egg_info:
    Traceback (most recent call last):

  File "<string>", line 17, in <module>

  File "/home/yevgeni/petastorm/setup.py", line 18, in <module>

    from petastorm import __version__

  File "petastorm/__init__.py", line 16, in <module>

    import six

ImportError: No module named six

----------------------------------------
Cleaning up...
Command python setup.py egg_info failed with error code 1 in /home/yevgeni/petastorm
Storing debug log for failure in /home/yevgeni/.pip/pip.log

Setup a lint tool

Can we have lint tool check that the code is python3 compatible, even if not covered with unit tests?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.