Giter Site home page Giter Site logo

jdasoftwaregroup / kartothek Goto Github PK

View Code? Open in Web Editor NEW
161.0 15.0 53.0 2.14 MB

A consistent table management library in python

Home Page: https://kartothek.readthedocs.io/en/stable

License: MIT License

Python 99.97% Shell 0.03%
python pydata dask arrow parquet kartothek

kartothek's People

Contributors

amerkel2 avatar aniruddhgoteti avatar crepererum avatar dependabot[bot] avatar eacheson avatar fhoehle avatar fjetter avatar florian-jetter-by avatar hoffmann avatar imkumarg avatar infehm avatar jakob-ernst-by avatar jochen-ott-by avatar jonashaag avatar kshitij68 avatar lr4d avatar lucas-rademaker-by avatar marco-neumann-by avatar martin-haffner-by avatar mganesh1308 avatar mlondschien avatar nefta-kanilmaz-by avatar nerocorleone avatar pacman82 avatar steffen-schroeder-by avatar stephan-hesselmann-by avatar svoons avatar treebee avatar usha-nemani-by avatar xhochy avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kartothek's Issues

Test compatiblity with Hive and Spark

According to our open-sourcing blog post, kartothek "is designed with compatibility in mind to the implicit standard storage layout used by Dask, Spark, Hive and more."

If we advertise this, shouldn't we test that Spark and Hive are able to read the Parquet files produced by kartothek?

@crepererum commented to me that he was unable to read the files in a language other than Python, because of some timestamp issue if I remember correctly.

Missing read implementation for dask bag

Since there is already a function which can write a bag of DataFrames to a kartothek dataset it would be nice to have a way to read it too and get a bag of DataFrames back.

Upstream Bugs Meta-Issue

This is a meta-issue linking bugs in upstream libraries that we are aware of and that are somewhat specific to kartothek. To add issues, please edit / comment with a link to the upstream issue. We encourage proper issue reporting and fixing and try to be as cooperative as possible. This is not meant to blame anything or anyone, we just want to inform our users about known issues.

Do NOT link security issues before they are published officially!

Legend:

  • ⚠️ open issue, severe
  • ℹ️ open issue, not severe (e.g. performance)
  • ⌛️ fixed, waiting for release
  • 🗜 workaround implemented in kartothek (but should still be fixed)
  • ✅ fixed (but not for all releases that we support

Apache Arrow

  • ARROW-5028: Arrow to Parquet conversion drops and corrupts values, fixed in 0.15.0
  • ARROW-5166: Statistics for uint64 columns may overflow, fixed in 0.14.0
  • 🗜ARROW-6281: Produce chunked arrays for nested types in pyarrow.array (breaks index builds)

Normalize input to `list` properly in `normalize_args`

Currently, if a user wants to partition along column "x", and since columns are unique, thinks it is appropriate to provide a set as the value to the argument e.g. partition_on={"x"}; this would be "normalized" to [{"x"}] ( a list of sets), and then an error will be raised at some other part of the code where this input is unexpected.
The expected behavior of the normalization would be to normalize sets, tuples or similar iterables to a list such as ["x"] (a list of strings).

I do not consider this a bug as in the docs it states partition_on should be provided as a list.

Normalization function: kartothek.io_components.utils.normalize_arg

Dispatch by general index columns

Currently we support custom dispatching using the concat_partitions_on_primary_index flag which merges MetaPartitions along primary key values. This particularly allows for compaction logic to merge all buckets with the same partition keys. This logic can be extended in two steps:

  1. Allow compaction on partition primary keys
  2. Allow compaction on all combination of indices

1.) would be trivially possible by extending the interface and replacing concat_partitions_on_primary_index by a generic parameter compact_on or similar
2.) would require manipulation of passed on predicates, s.t. the loaded data is in fact only what was requested
3.) Allow dispatching by generic columns. This will be pretty inefficient but technically possible once the predicate manipulation of 2.) is available

This would be a pre-condition to eventually support an abstraction concerning logical vs physical partitions.

I believe the implementation should be straight forward and possibilities are large. Nevertheless, I'd like to collect some feedback here regarding scope and implementation.

commit_dataset does not work with nested partitions

The following code example

from kartothek.io.eager import commit_dataset, write_single_partition, create_empty_dataset_header
from kartothek.core.common_metadata import make_meta
import pandas as pd
import storefact


store = storefact.get_store_from_url("hfs:///tmp/some_directory")
for x in store.keys():
    store.delete(x)

df = pd.DataFrame({"a": [1, 1, 2, 2], "b": [3, 4, 5, 6]})

create_empty_dataset_header(
    store=store,
    dataset_uuid="uuid",
    table_meta={"table": make_meta(df, "table", ["a"])},
    partition_on=["a"],
)

partitions = []
for x in range(2):
    partitions.append(
        write_single_partition(
            store=store,
            dataset_uuid="uuid",
            data={"data": {"table": df}},
            partition_on=["a"],
        )
    )

commit_dataset(
    store=store,
    dataset_uuid="uuid",
    new_partitions=partitions,
    partition_on=["a"],
)

Will create the error message AttributeError: Accessing label attribute is not allowed while nested.

Additionally you can observe the behaviour that after commit_dataset the partitions list contains one less element. In my understanding commit_dataset should not manipulate input objects.

Index doesn't raise if queried with wrong data type

Our indices are type stable and we require the input type for a given query to be of the exact type the index is using. This is important to ensure valid predicate evaluation. The current behavior is inconsistent and doesn't raise as expected if the queries type is invalid

import pyarrow as pa
from kartothek.core.index import PartitionIndex, ExplicitSecondaryIndex, IndexBase

ind = IndexBase(
    column='col',
    index_dct={
        1234: ["part"]
    },
    dtype=pa.int64()
)

# expected to raise, returns empty set
ind.eval_operator("==", "1234")
# expected to raise but casts the value and returns {"part"}
ind.eval_operator("==", 1234.0)

# only valid vall
ind.eval_operator("==", 1234)

Run master each night

We should run master tests each night (easily configurable in the Travis UI) and get also notifications for all maintainers about the failures. Normally Travis will only notify the committers of the last commit.

Confusing function signatures

read_dataset_as_ddf has confusing signature. It is not clear which arguments are required and which are not. Since we only have kwags here I assume that the function should work even without input.

def read_dataset_as_ddf(
    dataset_uuid=None,
    store=None,
    table=None,
    columns=None,
    concat_partitions_on_primary_index=False,
    predicate_pushdown_to_io=True,
    categoricals=None,
    label_filter=None,
    dates_as_object=False,
    predicates=None,
    factory=None,
):

Add documentation about store factories

We heavily rely on the usage of store factories to not (attempt to) serialize connections. We should add a section to the documentation explaining the reasoning, introduce examples and establish best practices (e.g. don't use lambdas)

`eager.store_dataframes_as_dataset` input is not satisfying `parse_input_to_metapartition` docstring specification

Summary:

  • {"data":{"table1": df, "table2": df3}} is an invalid input to store_dataframes_as_dataset (it should be valid)
  • {"table1": df, "table2": df3} is valid input to store_dataframes_as_dataset (but this format is not specified as such in docsring)

parse_input_to_metapartition docstring extract:

  1. Mode - Dictionary with partition information
        In this case, a dictionary is supplied where the keys describe the partition.
            * **label** - (optional) Unique partition label. If None is given, a UUID \
                        is generated using :func:`kartothek.core.uuid.gen_uuid`.
            * **data** - A dict or list of tuples. The keys represent the table name \
                        and the values are the actual payload data as a pandas.DataFrame.
            * **indices** - Deprecated, see the keyword argument `secondary_indices` to create indices.
                            A dictionary to describe the dataset indices. All \
                            partition level indices are finally merged using \
                            :func:`kartothek.io_components.metapartition.MetaPartition.merge_indices` \
                            into a single dataset index

Example (in quotes are debugging comments):

from kartothek.io.eager import store_dataframes_as_dataset
import numpy as np
import pandas as pd
from functools import  partial
from storefact import get_store_from_url

from tempfile import TemporaryDirectory

dataset_dir = TemporaryDirectory()
store_factory = partial(get_store_from_url, f"hfs://{dataset_dir.name}")

df = pd.DataFrame({"B": [pd.Timestamp("2019")]})

df3 = pd.DataFrame(
    {
        "B": pd.to_datetime(["20130102", "20190101"]),
        "L": [1, 4],
        "Q": [True, False],
    }
)
df3.dtypes
store_dataframes_as_dataset(
    store=store_factory,
    dataset_uuid="multiple_partitioned_tables",
    dfs={"data":{"table1": df, "table2": df3}},
    partition_on='B',
)
"""
`obj` in `parse_input_to_metapartition` is
    {'data': [('data', {'table1':            B
    0 2019-01-01, 'table2':            B  L      Q
    0 2013-01-02  1   True
    1 2019-01-01  4  False})]}
"""

store_dataframes_as_dataset(
    store=store_factory,
    dataset_uuid="multiple_partitioned_tables",
    dfs={"table1": df, "table2": df3},
    partition_on='B',
)
"""
`obj` in `parse_input_to_metapartition` is
    {'data': [('table1',            B
    0 2019-01-01), ('table2',            B  L      Q
    0 2013-01-02  1   True
    1 2019-01-01  4  False)]}
"""

Segmentation fault when running pytest on OSX

Pytest breaks when we run kartothek on OSX Mojave

I have followed the steps mentioned in the CONTRIBUTING.md.

It breaks with Segmentation fault: 11 when running test_eq module in the test_metapartition.py.

Here the pytest logs:

(kartothek-dev) ganesh:kartothek 10251XX$ pytest -v -k test_metapartition.py
=================================================================================================== test session starts ====================================================================================================
platform darwin -- Python 3.6.6, pytest-4.5.0, py-1.8.0, pluggy-0.12.0 -- /Users/10251XX/open_kartothek/kartothek-dev/bin/python3.6
cachedir: .pytest_cache
hypothesis profile 'default' -> database=DirectoryBasedExampleDatabase('/Users/10251XX/open_kartothek/kartothek/.hypothesis/examples')
rootdir: /Users/10251XX/open_kartothek/kartothek, inifile: setup.cfg
plugins: hypothesis-4.23.9, cov-2.7.1, mock-1.10.4, flake8-1.0.4
collected 1524 items / 1455 deselected / 69 selected                                                                                                                                                                       

tests/io_components/test_metapartition.py::test_store_single_dataframe_as_partition_no_metadata[4] PASSED                                                                                                            [  1%]
tests/io_components/test_metapartition.py::test_store_multiple_dataframes_as_partition[json-4] PASSED                                                                                                                [  2%]
tests/io_components/test_metapartition.py::test_store_single_dataframe_as_partition[json-4-dataset_uuid/core/test_label.parquet] PASSED                                                                              [  4%]
tests/io_components/test_metapartition.py::test_store_multiple_dataframes_as_partition[msgpack-4] PASSED                                                                                                             [  5%]
tests/io_components/test_metapartition.py::test_store_single_dataframe_as_partition[msgpack-4-dataset_uuid/core/test_label.parquet] PASSED                                                                           [  7%]
tests/io_components/test_metapartition.py::test_load_dataframes[4-True] PASSED                                                                                                                                       [  8%]
tests/io_components/test_metapartition.py::test_load_dataframes[4-False] PASSED                                                                                                                                      [ 10%]
tests/io_components/test_metapartition.py::test_remove_dataframes[4] PASSED                                                                                                                                          [ 11%]
tests/io_components/test_metapartition.py::test_load_dataframes_selective[4] PASSED                                                                                                                                  [ 13%]
tests/io_components/test_metapartition.py::test_load_dataframes_columns_projection[4] PASSED                                                                                                                         [ 14%]
tests/io_components/test_metapartition.py::test_load_dataframes_columns_raises_missing[4] PASSED                                                                                                                     [ 15%]
tests/io_components/test_metapartition.py::test_to_dict[4] PASSED                                                                                                                                                    [ 17%]
tests/io_components/test_metapartition.py::test_reconstruct_date_index[4-True] PASSED                                                                                                                                [ 18%]
tests/io_components/test_metapartition.py::test_reconstruct_date_index[4-False] PASSED                                                                                                                               [ 20%]
tests/io_components/test_metapartition.py::test_column_string_cast[4] PASSED                                                                                                                                         [ 21%]
tests/io_components/test_metapartition.py::test_from_dict PASSED                                                                                                                                                     [ 23%]
tests/io_components/test_metapartition.py::test_eq Segmentation fault: 11
(kartothek-dev) ganesh:kartothek 10251XX$ 

Here the details of the system:

Darwin ganesh.local 18.2.0 Darwin Kernel Version 18.2.0: Thu Dec 20 20:46:53 PST 2018; root:xnu-4903.241.1~1/RELEASE_X86_64 x86_64

Here the details of packages installed:

alabaster==0.7.12
appnope==0.1.0
aspy.yaml==1.3.0
asv==0.4
atomicwrites==1.3.0
attrs==19.1.0
Babel==2.7.0
backcall==0.1.0
certifi==2019.3.9
cfgv==2.0.0
chardet==3.0.4
Click==7.0
cloudpickle==1.1.1
coverage==4.5.3
dask==1.2.2
decorator==4.4.0
distributed==1.28.1
docutils==0.14
entrypoints==0.3
flake8==3.7.7
flake8-mutable==1.2.0
HeapDict==1.0.0
hypothesis==4.23.9
identify==1.4.3
idna==2.8
imagesize==1.1.0
importlib-metadata==0.15
importlib-resources==1.0.2
ipython==7.5.0
ipython-genutils==0.2.0
jedi==0.13.3
Jinja2==2.10.1
-e 
locket==0.2.0
MarkupSafe==1.1.1
mccabe==0.6.1
more-itertools==7.0.0
msgpack==0.6.1
nodeenv==1.3.3
numpy==1.16.4
packaging==19.0
pandas==0.24.2
parso==0.4.0
partd==0.3.10
pexpect==4.7.0
pickleshare==0.7.5
pluggy==0.12.0
pre-commit==1.16.1
prompt-toolkit==2.0.9
psutil==5.6.2
ptyprocess==0.6.0
py==1.8.0
pyarrow==0.13.0
pycodestyle==2.5.0
pyflakes==2.1.1
Pygments==2.4.2
pyparsing==2.4.0
pytest==4.5.0
pytest-cov==2.7.1
pytest-flake8==1.0.4
pytest-mock==1.10.4
python-dateutil==2.8.0
pytz==2019.1
PyYAML==5.1
requests==2.22.0
setuptools-scm==3.3.3
simplejson==3.16.0
simplekv==0.12.0
six==1.12.0
snowballstemmer==1.2.1
sortedcontainers==2.1.0
Sphinx==2.0.1
sphinx-rtd-theme==0.4.3
sphinxcontrib-applehelp==1.0.1
sphinxcontrib-devhelp==1.0.1
sphinxcontrib-htmlhelp==1.0.2
sphinxcontrib-jsmath==1.0.1
sphinxcontrib-qthelp==1.0.2
sphinxcontrib-serializinghtml==1.1.3
storefact==0.8.4
tblib==1.4.0
toml==0.10.0
toolz==0.9.0
tornado==6.0.2
traitlets==4.3.2
uritools==2.2.0
urllib3==1.25.3
virtualenv==16.6.0
wcwidth==0.1.7
zict==0.1.4
zipp==0.5.1
zstandard==0.11.1

Thanks,
Ganesh

Keyword argument for secondary_indices is not used consistently in the code base

We introduced the keyword argument secondary_indices to allow writing pipelines to calculate an index on the fly instead of having the user to provide this as an input.
This keyword has been introduced only inconsistently, yet and many pipelines still don't use it. We should enable all writing pipelines to use this keyword argument to eventually deprecate the explicit user input.

Kartothek attempts to use simplekv incorrectly

System: OSX
Python version: 3.6.7
Kartothek version: 3.0.0
simplekv version: 0.12.0

The following code (which is a fairly minimal example) is giving me issues:

import pandas as pd
from kartothek.io.eager import store_dataframes_as_dataset
from simplekv.fs import FilesystemStore

store = FilesystemStore('./data')

df = pd.DataFrame(
    {
        "A": [1,2,3],
        "B": [1,2,3],
    }
)
dm = store_dataframes_as_dataset(store, "peter_df", df, metadata_version=4)

Kartothek tries to write to simplekv with the key peter_df/table/76db5d03cb294fbb913544d70d3ec353.parquet. As far as I can tell, simplekv does not accept / as a character in its key name, so this fails.

Below is a detiailed traceback of the error:

Writing dataframe failed.
'peter_df/table/9131c25517fb4baabd1a9e4827f65fca.parquet' contains illegal characters
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 2 columns):
A    3 non-null int64
B    3 non-null int64
dtypes: int64(2)
memory usage: 128.0 bytes

   A  B
0  1  1
1  2  2
2  3  3
Traceback (most recent call last):
  File "k_exp.py", line 13, in <module>
    dm = store_dataframes_as_dataset(store, "peter_df", df, metadata_version=4)
  File "<decorator-gen-5>", line 2, in store_dataframes_as_dataset
  File "/anaconda3/envs/exp/lib/python3.6/site-packages/kartothek/io_components/utils.py", line 207, in normalize_args
    return _wrapper(*args, **kwargs)
  File "/anaconda3/envs/exp/lib/python3.6/site-packages/kartothek/io_components/utils.py", line 205, in _wrapper
    return function(*args, **kwargs)
  File "/anaconda3/envs/exp/lib/python3.6/site-packages/kartothek/io/eager.py", line 478, in store_dataframes_as_dataset
    store=store, dataset_uuid=dataset_uuid, df_serializer=df_serializer
  File "/anaconda3/envs/exp/lib/python3.6/site-packages/kartothek/io_components/metapartition.py", line 105, in _impl
    method_return = method(mp, *method_args, **method_kwargs)
  File "/anaconda3/envs/exp/lib/python3.6/site-packages/kartothek/io_components/metapartition.py", line 990, in store_dataframes
    six.reraise(exc_type, exc_value, exc_traceback)
  File "/anaconda3/envs/exp/lib/python3.6/site-packages/six.py", line 693, in reraise
    raise value
  File "/anaconda3/envs/exp/lib/python3.6/site-packages/kartothek/io_components/metapartition.py", line 973, in store_dataframes
    file_dct[table] = df_serializer.store(store, key, df)
  File "/anaconda3/envs/exp/lib/python3.6/site-packages/kartothek/serialization/_parquet.py", line 199, in store
    store.put(key, buf.getvalue().to_pybytes())
  File "/anaconda3/envs/exp/lib/python3.6/site-packages/simplekv/__init__.py", line 147, in put
    self._check_valid_key(key)
  File "/anaconda3/envs/exp/lib/python3.6/site-packages/simplekv/__init__.py", line 192, in _check_valid_key
    raise ValueError('%r contains illegal characters' % key)
ValueError: 'peter_df/table/9131c25517fb4baabd1a9e4827f65fca.parquet' contains illegal characters

Add type annotations

Abstract

We use types in our documentation but it would be better to provide proper annotations.

Example

From:

def add(a, b):
    """
    Adds to numbers.

    Parameters
    ----------
    a: Union[float, int]
        First number to add.
    b: Union[float, int]
        Second number to add.

    Returns
    -------
    c: float
        Sum of both.
    """
    ...

To:

from typing import Union


def add(a: Union[float, int], b: Union[float, int]) -> float:
    """
    Adds to numbers.

    Parameters
    ----------
    a:
        First number to add.
    b:
        Second number to add.

    Returns
    -------
    c:
        Sum of both.
    """
    ...

Please note the colons and new lines after the parameter and return value names, otherwise, sphinx docs break (sphinx will treat your docs as types).

Make `table` kwarg in `read_table*` use the default table name if unspecified

Currently, one can create a dataset without specifying a table name, and a default table name ("table") will be assigned as the name of that table.
However, when one then tries to read that dataset using read_table, it becomes necessary to explicitly specify a table name:

>>> from kartothek.io.eager import read_table
>>> read_table(dataset_uuid=dataset_uuid, store=store_factory)

TypeError: Parameter `table` is not optional.

I suggest the table parameter in kartothek.io.*.read_table* functions takes the default table name of kartothek if it is not specified, just as the store/update functions do.

This would fail for datasets created with old kartothek versions (due to the change in default table name in v3.0), but AFAIK most of the read_table implementations were not available at that time. In case of failure, in the error message it could be clarified that the default table name was used

Test minimal requirements

Add lower requirement constraints to all requirements and add one travis test environment that uses it.

Hints:

  • Have a look at compile_requirements.sh and .travis.yml

read_dataset_as_ddf does not return stored datetime index

Problem description

When reading kartothek dataset with read_dataset_as_ddf I am losing original datetime index when stored with update_dataset_from_ddf. Even though children parquet files in kartothek datasets' directory still keep the index as datetime. Can you please take a look on that? Original data comes from machine sensors, being mocked here:

Example code (ideally copy-pastable)

import pandas as pd
from functools import partial
from storefact import get_store_from_url
from kartothek.io.dask.dataframe import update_dataset_from_ddf
from kartothek.io.dask.dataframe import read_dataset_as_ddf
import dask.dataframe as dd

# Create dask dataframe
ddf = (dd.from_pandas(
        pd.DataFrame({'dateTime':[pd.Timestamp('2019-08-15T23:59:58'),pd.Timestamp('2019-08-15T23:59:59')],
                    'plantCode':['PL1','PL2'],
                    'value':[80.0,90.0],
                    'variableName':['sensor_1','sensor_2']}), 
        npartitions=1)
        .assign(date=lambda x:x['dateTime'].dt.date)
        .set_index('dateTime')
     )

Check index type

ddf.index

Output being correctly datetime64[ns]

Dask Index Structure:
npartitions=1
2019-08-15 23:59:58    datetime64[ns]
2019-08-15 23:59:59               ...
Name: dateTime, dtype: datetime64[ns]
Dask Name: sort_index, 7 tasks

Creating dataset and reading it back

store = './karto/'
store_factory = partial(get_store_from_url, "hfs://" + store)

update_dataset_from_ddf(ddf=ddf,
                        store=store_factory,
                        dataset_uuid='test_data',
                        table = 'table',
                        partition_on=['date']).compute()

ds = read_dataset_as_ddf(dataset_uuid='test_data',
                         store=store_factory,
                         table='table')

Checking index

ds.index

Outputs incorrectly int64[ns]

Dask Index Structure:
npartitions=1
    int64
      ...
dtype: int64
Dask Name: from-delayed, 4 tasks

While reading dd.read_parquet one of the children parquets of kartothek directory keeps the datetime index

dd.read_parquet('./karto/test_data/table/date=2019-08-15/68dc115bd83d4de1a701ebc567c6e6f6.parquet').index

Output being correctly datetime64[ns]

Dask Index Structure:
npartitions=1
    datetime64[ns]
               ...
Name: dateTime, dtype: datetime64[ns]
Dask Name: read-parquet, 2 tasks

Used versions

# Paste your output of `pip freeze` or `conda list` here
# Name                    Version                   Build  Channel
appnope                   0.1.0                 py37_1000    conda-forge
arrow-cpp                 0.13.0           py37h8cfbac2_0  
astroid                   2.2.5                    py37_0  
atomicwrites              1.3.0                    pypi_0    pypi
attrs                     19.1.0                   pypi_0    pypi
backcall                  0.1.0                      py_0    conda-forge
bleach                    3.1.0                    pypi_0    pypi
bokeh                     1.3.4                    py37_0    conda-forge
boost-cpp                 1.67.0               h1de35cc_4  
brotli                    1.0.7                h0a44026_0  
bzip2                     1.0.8                h1de35cc_0  
ca-certificates           2019.9.11            hecc5488_0    conda-forge
certifi                   2019.9.11                py37_0    conda-forge
chardet                   3.0.4                    pypi_0    pypi
click                     7.0                        py_0    conda-forge
cloudpickle               1.2.1                      py_0    conda-forge
coverage                  4.5.4                    pypi_0    pypi
croniter                  0.3.30                   pypi_0    pypi
cytoolz                   0.10.0           py37h01d97ff_0    conda-forge
dask                      2.3.0                      py_0    conda-forge
dask-core                 2.3.0                      py_0    conda-forge
decorator                 4.4.0                      py_0    conda-forge
defusedxml                0.6.0                    pypi_0    pypi
distributed               2.3.2                      py_1    conda-forge
docker                    4.0.2                    pypi_0    pypi
double-conversion         3.1.5                haf313ee_1  
entrypoints               0.3                      pypi_0    pypi
fastparquet               0.3.2            py37heacc8b8_0    conda-forge
freetype                  2.10.0               h24853df_1    conda-forge
fsspec                    0.4.3                      py_0    conda-forge
gflags                    2.2.2                h0a44026_0  
glog                      0.4.0                h0a44026_0  
heapdict                  1.0.0                 py37_1000    conda-forge
icu                       58.2                 h4b95b61_1  
idna                      2.8                      pypi_0    pypi
importlib-metadata        0.20                     pypi_0    pypi
ipykernel                 5.1.2            py37h5ca1d4c_0    conda-forge
ipython                   7.8.0            py37h5ca1d4c_0    conda-forge
ipython_genutils          0.2.0                      py_1    conda-forge
ipywidgets                7.5.1                    pypi_0    pypi
isort                     4.3.21                   py37_0  
jedi                      0.15.1                   py37_0    conda-forge
jinja2                    2.10.1                     py_0    conda-forge
jpeg                      9c                h1de35cc_1001    conda-forge
json5                     0.8.5                    pypi_0    pypi
jsonschema                3.0.2                    pypi_0    pypi
jupyter                   1.0.0                    pypi_0    pypi
jupyter-console           6.0.0                    pypi_0    pypi
jupyter_client            5.3.1                      py_0    conda-forge
jupyter_core              4.4.0                      py_0    conda-forge
jupyterlab                1.1.1                    pypi_0    pypi
jupyterlab-server         1.0.6                    pypi_0    pypi
kartothek                 3.3.0                    pypi_0    pypi
lazy-object-proxy         1.4.2            py37h1de35cc_0  
libblas                   3.8.0               12_openblas    conda-forge
libboost                  1.67.0               hebc422b_4  
libcblas                  3.8.0               12_openblas    conda-forge
libcxx                    4.0.1                hcfea43d_1  
libcxxabi                 4.0.1                hcfea43d_1  
libedit                   3.1.20181209         hb402a30_0  
libevent                  2.1.8                ha12b0ac_0  
libffi                    3.2.1                h475c297_4  
libgfortran               4.0.0                         2    conda-forge
libiconv                  1.15                 hdd342a3_7  
liblapack                 3.8.0               12_openblas    conda-forge
libopenblas               0.3.7                h4bb4525_1    conda-forge
libpng                    1.6.37               h2573ce8_0    conda-forge
libprotobuf               3.6.0                hd9629dc_0  
libsodium                 1.0.17               h01d97ff_0    conda-forge
libtiff                   4.0.10               hcb84e12_2  
llvm-openmp               8.0.1                h770b8ee_0    conda-forge
llvmlite                  0.29.0                   pypi_0    pypi
locket                    0.2.0                      py_2    conda-forge
lz4-c                     1.8.3             h6de7cb9_1001    conda-forge
markupsafe                1.1.1            py37h1de35cc_0    conda-forge
marshmallow               3.0.2                    pypi_0    pypi
marshmallow-oneofschema   2.0.1                    pypi_0    pypi
mccabe                    0.6.1                    py37_1  
mistune                   0.8.4                    pypi_0    pypi
more-itertools            7.2.0                    pypi_0    pypi
msgpack-python            0.6.1            py37h04f5b5a_0    conda-forge
mypy-extensions           0.4.1                    pypi_0    pypi
nbconvert                 5.6.0                    pypi_0    pypi
nbformat                  4.4.0                    pypi_0    pypi
ncurses                   6.1                  h0a44026_1  
notebook                  6.0.1                    pypi_0    pypi
numba                     0.45.1           py37h86efe34_0    conda-forge
numpy                     1.17.1           py37h6b0580a_0    conda-forge
olefile                   0.46                       py_0    conda-forge
openssl                   1.1.1c               h01d97ff_0    conda-forge
packaging                 19.0                       py_0    conda-forge
pandas                    0.25.1           py37h86efe34_0    conda-forge
pandas-bokeh              0.3                      pypi_0    pypi
pandocfilters             1.4.2                    pypi_0    pypi
parso                     0.5.1                      py_0    conda-forge
partd                     1.0.0                      py_0    conda-forge
pendulum                  2.0.5                    pypi_0    pypi
pexpect                   4.7.0                    py37_0    conda-forge
pickleshare               0.7.5                 py37_1000    conda-forge
pillow                    5.3.0           py37hbddbef0_1000    conda-forge
pip                       19.2.2                   py37_0  
pluggy                    0.12.0                   pypi_0    pypi
prefect                   0.6.3                    pypi_0    pypi
prometheus-client         0.7.1                    pypi_0    pypi
prompt_toolkit            2.0.9                      py_0    conda-forge
psutil                    5.6.3            py37h01d97ff_0    conda-forge
ptyprocess                0.6.0                   py_1001    conda-forge
py                        1.8.0                    pypi_0    pypi
pyarrow                   0.13.0           py37h0a44026_0  
pygments                  2.4.2                      py_0    conda-forge
pylint                    2.3.1                    py37_0  
pyparsing                 2.4.2                      py_0    conda-forge
pyrsistent                0.15.4                   pypi_0    pypi
pyscaffold                3.2.1                      py_0    conda-forge
pytest                    5.1.2                    pypi_0    pypi
pytest-cov                2.7.1                    pypi_0    pypi
python                    3.7.4                h359304d_1  
python-dateutil           2.8.0                      py_0    conda-forge
python-slugify            3.0.3                    pypi_0    pypi
python-snappy             0.5.4            py37h1e06ddd_0    conda-forge
pytz                      2019.2                     py_0    conda-forge
pytzdata                  2019.2                   pypi_0    pypi
pyyaml                    5.1.2            py37h01d97ff_0    conda-forge
pyzmq                     18.0.2           py37hee98d25_2    conda-forge
qtconsole                 4.5.5                    pypi_0    pypi
re2                       2019.08.01           h0a44026_0  
readline                  7.0                  h1de35cc_5  
requests                  2.22.0                   pypi_0    pypi
send2trash                1.5.0                    pypi_0    pypi
setuptools                41.0.1                   py37_0  
simplejson                3.16.0                   pypi_0    pypi
simplekv                  0.13.0                   pypi_0    pypi
six                       1.12.0                py37_1000    conda-forge
snappy                    1.1.7             h6de7cb9_1002    conda-forge
sortedcontainers          2.1.0                      py_0    conda-forge
sqlite                    3.29.0               ha441bb4_0  
storefact                 0.9.0                    pypi_0    pypi
tabulate                  0.8.3                    pypi_0    pypi
tblib                     1.4.0                      py_0    conda-forge
terminado                 0.8.2                    pypi_0    pypi
testpath                  0.4.2                    pypi_0    pypi
text-unidecode            1.2                      pypi_0    pypi
thrift                    0.11.0          py37h0a44026_1001    conda-forge
thrift-cpp                0.11.0               hd79cdb6_3  
tk                        8.6.8                ha441bb4_0  
toml                      0.10.0                   pypi_0    pypi
toolz                     0.10.0                     py_0    conda-forge
tornado                   6.0.3            py37h01d97ff_0    conda-forge
traitlets                 4.3.2                 py37_1000    conda-forge
typing                    3.7.4.1                  pypi_0    pypi
typing-extensions         3.7.4                    pypi_0    pypi
uritools                  2.2.0                    pypi_0    pypi
urllib3                   1.25.3                   pypi_0    pypi
wcwidth                   0.1.7                      py_1    conda-forge
webencodings              0.5.1                    pypi_0    pypi
websocket-client          0.56.0                   pypi_0    pypi
wheel                     0.33.4                   py37_0  
widgetsnbextension        3.5.1                    pypi_0    pypi
wrapt                     1.11.2           py37h1de35cc_0  
xlrd                      1.2.0                    pypi_0    pypi
xz                        5.2.4                h1de35cc_4  
yaml                      0.1.7             h1de35cc_1001    conda-forge
zeromq                    4.3.2                h6de7cb9_2    conda-forge
zict                      1.0.0                      py_0    conda-forge
zipp                      0.6.0                    pypi_0    pypi
zlib                      1.2.11               h1de35cc_3  
zstandard                 0.11.1                   pypi_0    pypi
zstd                      1.3.7                h5bba6e5_0  

MetaPartition dispatch contains the dataset metadata

Problem description

Currently dispatch_metapartitions_from_factory yields MetaPartitions holding the entire user metadata of the dataset for every MP. For large customer metadata this slows down scheduling significantly and should be avoided

add `date_as_object` flag to `get_indices_as_dataframe`

Description

Currently core.dataset.DatasetMetadataBase.get_indices_as_dataframe loads date type columns as object. A date_as_object flag for this function would be useful.

Example code

from collections import OrderedDict
from kartothek.io.eager import store_dataframes_as_dataset
from kartothek.core.dataset import DatasetMetadata
df = pd.DataFrame(
    OrderedDict(
        [
            ("P", [1]),
            ("L", [1]),
            ("TARGET", [1]),
            ("DATE", [datetime.date(2010, 1, 1)]),
        ]
    )
)
store_dataframes_as_dataset(store=store, dataset_uuid="some_uuid", dfs=df, partition_on="DATE")
dmd = DatasetMetadata.load_from_store("some_uuid", store)
dmd = dmd.load_partition_indices()
idx = dmd.get_indices_as_dataframe()["DATE"]
# returns dtype('O')

Used versions

alabaster==0.7.12
appnope==0.1.0
aspy.yaml==1.3.0
astroid==2.2.5
asv==0.4.1
atomicwrites==1.3.0
attrs==19.1.0
Babel==2.7.0
backcall==0.1.0
certifi==2019.6.16
cfgv==2.0.1
chardet==3.0.4
Click==7.0
cloudpickle==1.2.1
coverage==4.5.4
dask==2.2.0
decorator==4.4.0
distributed==2.2.0
docutils==0.15.2
entrypoints==0.3
flake8==3.7.8
flake8-mutable==1.2.0
fsspec==0.4.1
HeapDict==1.0.0
hypothesis==4.32.3
identify==1.4.5
idna==2.8
imagesize==1.1.0
importlib-metadata==0.19
importlib-resources==1.0.2
ipython==7.7.0
ipython-genutils==0.2.0
isort==4.3.21
jedi==0.14.1
Jinja2==2.10.1
-e git+https://github.com/JDASoftwareGroup/kartothek.git@beafc3a3bb6df08d81489722ec4413f5cda42fda#egg=kartothek
lazy-object-proxy==1.4.1
locket==0.2.0
MarkupSafe==1.1.1
mccabe==0.6.1
more-itertools==7.2.0
msgpack==0.6.1
nodeenv==1.3.3
numpy==1.17.0
packaging==19.1
pandas==0.25.0
parso==0.5.1
partd==1.0.0
pexpect==4.7.0
pickleshare==0.7.5
pluggy==0.12.0
pre-commit==1.18.0
prompt-toolkit==2.0.9
psutil==5.6.3
ptyprocess==0.6.0
py==1.8.0
pyarrow==0.13.0
pycodestyle==2.5.0
pyflakes==2.1.1
Pygments==2.4.2
pylint==2.3.1
pyparsing==2.4.2
pytest==5.0.1
pytest-cov==2.7.1
pytest-flake8==1.0.4
pytest-mock==1.10.4
python-dateutil==2.8.0
pytz==2019.2
PyYAML==5.1.2
requests==2.22.0
setuptools-scm==3.3.3
simplejson==3.16.0
simplekv==0.13.0
six==1.12.0
snowballstemmer==1.9.0
sortedcontainers==2.1.0
Sphinx==2.1.2
sphinx-rtd-theme==0.4.3
sphinxcontrib-applehelp==1.0.1
sphinxcontrib-devhelp==1.0.1
sphinxcontrib-htmlhelp==1.0.2
sphinxcontrib-jsmath==1.0.1
sphinxcontrib-qthelp==1.0.2
sphinxcontrib-serializinghtml==1.1.3
storefact==0.9.0
tblib==1.4.0
toml==0.10.0
toolz==0.10.0
tornado==6.0.3
traitlets==4.3.2
typed-ast==1.3.4
uritools==2.2.0
urllib3==1.25.3
virtualenv==16.7.2
wcwidth==0.1.7
wrapt==1.11.2
zict==1.0.0
zipp==0.5.2
zstandard==0.11.1

[QUESTION] Integration of Zarr with karothek

Hi Folks,

First and foremost, I've spent the last few hours working my way through your blog posts, RTD documentation, and source code documentation and really like where you guys are going with karothek. Thank you so much for open sourcing this project. Having a bias for the JVM I've been interested in Apache Iceberg for some time but Python is where we need to be to process the N-dimensional array data (Earth orbiting remote sensing missions which natively produce write-optimized netCDF4/5 and HDF5/EOS) we work with at NASA so karothek was a real pleasant surprise.

I wanted to ask if anyone here has been working with Zarr? I envision kartothek as the dataset and table management layer with the actual arrays being stored as Zarr implementations which are highlighly read and analysis optimized... basically analysis ready (in something like Dask)!

Thanks again for sharing this under a permissive license... excellent work.

Set up CI job for OSX

Currently we semi-support OSX. We should make sure how far our support goes by setting up a pipeline. A potential issue might be that dask tests do not succeed since dask.distributed is not OSX tested

master broken

Problem description

Master is broken. The reason is that we use dask together with pyarrow, but dask only likes certain versions of pyarrow. So we need to make sure during the pinning that they work well together.

When using hs3, MinIO, and store_delayed_as_dataset, the task never finishes

Below is an example that fails on Linux, MinIO, and kartothek 3.0.1. The program never executes print ("Finished").

The MinIO browser does show the creation of the bucket, the hierarchy, and the file.

I do not see any messages from the `dask-scheduler' except 'heartbeat_worker'.

What am I doing wrong or is there any more information that I can provide to help debug the problem?

import sys

import pandas as pd

from storefact import get_store_from_url
from functools import partial

from kartothek.io.dask.delayed import store_delayed_as_dataset

def main(argv=None):

    if argv is None:
        argv = sys.argv

    verbose = False
    useTemporaryDirectory = True

    df = pd.DataFrame({
        'file' : ['file 1', ],
        'description' : ['description 1'],
        'run' : ['run 1',],
    }
    )

    store_factory = partial(get_store_from_url,
                            'hs3://secret:[email protected]:9000/kart?create_if_missing=true')
    
    print (df.head())

    input_list_of_partitions = [
        {
            'label': 'Run Record Description',
            'data': [
                ('MetaData', df),
            ]
        },
    ]

    print ('Using Dask')
    task = store_delayed_as_dataset(input_list_of_partitions,
                                    store=store_factory,
                                    dataset_uuid='uuid-1',
                                    metadata={'dataset': 'dateset'},  # This is optional dataset metadata
                                    metadata_version=4,
    )

    task.compute()
    print ("Finished")
        
    return

if __name__ == '__main__':
    sys.exit(main() or 0)

Remove six

kartothek once was Python2+3 compatible, which is not the case anymore. We can safely remove six now and replace it with proper Python3-only code.

Creating a dataset with a non-existing column as explicit index does not raise

# -*- coding: utf-8 -*-

import pandas as pd
from functools import partial
import storefact

from kartothek.io.eager import store_dataframes_as_dataset
from kartothek.core.index import ExplicitSecondaryIndex


def get_storefactory(store_config):
    return partial(storefact.get_store, **store_config)


store_config = {"type": "hfs", "path": "some_directory"}
store = get_storefactory(store_config)
partitions = [
    {
        "label": "part1",
        "data": [("core", pd.DataFrame({"p": [1, 2]}))],
        "indices": {"x": ExplicitSecondaryIndex("x", {1: ["part1"], 2: ["part2"]})},
    }
]
store_dataframes_as_dataset(
    dfs=partitions,
    store=store,
    metadata={"dataset": "metadata"},
    dataset_uuid="dataset_uuid",
)

# Returns: DatasetMetadata(uuid=dataset_uuid, tables=['core'], partition_keys=[], metadata_version=4, indices=['x'], explicit_partitions=True)

Format Breaking Changes Candidates

This is a meta issue describing which changes we would like to make now but which are incompatible with the current format.

Type Normalization

The following type normalizations are currently not implemented but could be:

  • decimal128[P, S] -> decimal128[38, S] (38 is the max for 128 bits)
  • date{32, 64} -> date64
  • time{32, 64}[U] -> time64[U]
  • structs (nested normalization)

Index Handling

Reject non-integer/range indices, use reset_index and drop index information before writing data. Always restore as normalized (reset_index) indices, even when applying predicates.

Pandas-specific Metadata

Pandas-specific metadata is part of the Arrow schema but is not part of the Arrow Type system. It captures information like the index type. If Index Handling is implemented, we could drop the entire pandas metadata field. This would simplify interopt with other languages/frameworks.

Labels

Use UUIDs everwhere and reject user-provided labels.

Inconsistent type of kwarg 'store' across write and update

The eager write functions appear to expect the argument supplied to store to directly be a store object, whereas the update function appears to expect a factory (python callable) - can it be standardized one way or another please?

   import numpy as np
   import pandas as pd
   from functools import partial
   from storefact import get_store_from_url
   from tempfile import TemporaryDirectory
   from kartothek.io.eager import store_dataframes_as_dataset
   from kartothek.io.eager import update_dataset_from_dataframes

   df = pd.DataFrame(
       {
           "A": 1.,
           "B": pd.Timestamp("20130102"),
           "C": pd.Series(1, index=list(range(4)), dtype="float32"),
           "D": np.array([3] * 4, dtype="int32"),
           "E": pd.Categorical(["test", "train", "test", "train"]),
           "F": "foo",
       }
   )

   dataset_dir = TemporaryDirectory()
   store = get_store_from_url(f"hfs://{dataset_dir.name}") 

   dm = store_dataframes_as_dataset(
      store, #store object works fine here  
      "a_unique_dataset_identifier", 
      df, 
      metadata_version=4
   )

   another_df = pd.DataFrame(
       {
           "A": 2.,
           "B": pd.Timestamp("20190604"),
           "C": pd.Series(2, index=list(range(4)), dtype="float32"),
           "D": np.array([6] * 4, dtype="int32"),
           "E": pd.Categorical(["test", "train", "test", "train"]),
           "F": "bar",
       }
   )

   store_factory = partial(get_store_from_url, f"hfs://{dataset_dir.name}")

   dm = update_dataset_from_dataframes(
       [another_df],
       store=store_factory, #but this needs to be a callable
       dataset_uuid="a_unique_dataset_identifier"
       )
   dm

Dispatch by ignores predicates

Problem description

Predicates are not respected if used in conjunction with dipatch_by

Example code

import storefact
import tempfile
import pandas as pd
import shutil
from kartothek.io.eager import store_dataframes_as_dataset, build_dataset_indices
from kartothek.io.dask.delayed import read_dataset_as_delayed


folder = tempfile.mkdtemp(prefix="ktk_test_")
s = storefact.get_store_from_url(f"hfs://{folder}")


df = pd.DataFrame({"a": range(10), "b": [0, 1] * 5, "c": [0, 1, 2, 3, 4] * 2})
store_dataframes_as_dataset(s, "test", [df], partition_on=["c"])
build_dataset_indices(lambda: s, "test", ["b"])

assert (
        len(
            read_dataset_as_delayed(
                dataset_uuid="test",
                store=lambda: s,
                predicates=[[("b", "in", [1])]],
                dispatch_by=["b"],
            )
        )
        == 1
    )

Using s3 to store datasets

I am using minio to server my s3 context.

The creation of the store is:

from storefact import get_store_from_url
store = get_store_from_url('s3://key:secret@internal:9000/kart?create_if_missing=true')

Then I use the following to write the df:

from kartothek.io.eager import store_dataframes_as_dataset
dm = store_dataframes_as_dataset(store,
                                                          'runRecordDescriptionTable',
                                                          df,
                                                          metadata_version=4)    

But the following error occurs:

Traceback (most recent call last):
  File "s3fsLoad.py", line 243, in <module>
    sys.exit(main() or 0)
  File "s3fsLoad.py", line 225, in main
    metadata_version=4)    
  File "</home/josephwinston/anaconda3/envs/kartothek/lib/python3.6/site-packages/decorator.py:decorator-gen-5>", line 2, in store_dataframes_as_dataset
  File "/home/josephwinston/anaconda3/envs/kartothek/lib/python3.6/site-packages/kartothek/io_components/utils.py", line 207, in normalize_args
    return _wrapper(*args, **kwargs)
  File "/home/josephwinston/anaconda3/envs/kartothek/lib/python3.6/site-packages/kartothek/io_components/utils.py", line 205, in _wrapper
    return function(*args, **kwargs)
  File "/home/josephwinston/anaconda3/envs/kartothek/lib/python3.6/site-packages/kartothek/io/eager.py", line 478, in store_dataframes_as_dataset
    store=store, dataset_uuid=dataset_uuid, df_serializer=df_serializer
  File "/home/josephwinston/anaconda3/envs/kartothek/lib/python3.6/site-packages/kartothek/io_components/metapartition.py", line 105, in _impl
    method_return = method(mp, *method_args, **method_kwargs)
  File "/home/josephwinston/anaconda3/envs/kartothek/lib/python3.6/site-packages/kartothek/io_components/metapartition.py", line 990, in store_dataframes
    six.reraise(exc_type, exc_value, exc_traceback)
  File "/home/josephwinston/anaconda3/envs/kartothek/lib/python3.6/site-packages/six.py", line 693, in reraise
    raise value
  File "/home/josephwinston/anaconda3/envs/kartothek/lib/python3.6/site-packages/kartothek/io_components/metapartition.py", line 973, in store_dataframes
    file_dct[table] = df_serializer.store(store, key, df)
  File "/home/josephwinston/anaconda3/envs/kartothek/lib/python3.6/site-packages/kartothek/serialization/_parquet.py", line 199, in store
    store.put(key, buf.getvalue().to_pybytes())
  File "/home/josephwinston/anaconda3/envs/kartothek/lib/python3.6/site-packages/simplekv/__init__.py", line 147, in put
    self._check_valid_key(key)
  File "/home/josephwinston/anaconda3/envs/kartothek/lib/python3.6/site-packages/simplekv/__init__.py", line 192, in _check_valid_key
    raise ValueError('%r contains illegal characters' % key)
ValueError: 'runRecordDescriptionTable/table/1d2e32e172f949d4a90d6e727ea0829e.parquet' contains illegal characters

It appears that the problem is the s3 bucket name is runRecordDescriptionTable/table/1d2e32e172f949d4a90d6e727ea0829e.parquet'

Is there a transform that I need to make to ensure the bucket name is correct?

Add (secondary) indices section to documentation

Draft from #62 :


Secondary Indices
-----------------

The ability to build and maintain `inverted indices <https://en.wikipedia.org/wiki/Inverted_index>`_
are an additional feature provided by ``kartothek``.
In general, an index is a data structure used to improve
the speed of read queries. In the context of ``kartothek`` an index is a data structure
that contains a mapping of every unique value of a given column to references to all the
partitions where this value occurs.

While this index has a one-to-one mapping of column values to partition references,
secondary indices have the advantage of being able to contain one-to-many mappings of
column values to partition references.

Writing a dataset with a secondary index:

.. ipython:: python

    from kartothek.io.iter import store_dataframes_as_dataset__iter

    # "Generate" 5 dataframes
    df_gen = (
        pd.DataFrame({"date": pd.Timestamp(f"2020-01-0{i}"), "X": np.random.choice(10, 10)})
        for i in range(1, 6)
    )

    dm = store_dataframes_as_dataset__iter(
        df_gen,
        store_factory,
        "secondarily_indexed",
        partition_on="date",
        secondary_indices="X",
    )
    dm

    dm = dm.load_all_indices(store_factory())
    dm.indices["X"].eval_operator("==", 0)  # Show files where `X == 0`


As can be seen from the example above, both ``partition_on`` and ``secondary_indices``
can be specified together. Multiple ``secondary_indices`` can also be added as a list of
strings.

Contrary to ``partition_on``, the order of columns is ignored for ``secondary_indices``.

Improve dask no-cloudpickle testing

Dask.Distributed currently tries to use builtin pickle and falls back to the potentially more inefficient cloudpickle in case of an exception. We should ensure that kartothek does not need this fallback. We're already doing this in a couple of places (1, 2, and others). However, this method relays on all devs being aware of that and it repeats the pickle roundtrip code all over the place. We should instead make this a global test setup. Some options that come to my mind:

The manual pickle roundtrip code should then be deleted.

Allow arrow tables in read and write interface

We're using Apache Arrow as the ultimate tool to glue everything together. When writing data, we accept pandas dataframes, convert them to arrow tables and store them as parquet. When reading data it is the other way round. For some pipelines it is not necessary to convert the data ever to pandas and we could process the data directly without ever converting to pandas.

As a user I'd like to have the possibility to directly pass arrow tables to the kartothek pipeline to store them and have the option to read the data directly as an arrow table. Conversion to pandas should only be done if absolutely necessary. A current example for a necessary conversion would be the partition_on feature where we perform a groupby on the data.

Versioning, breaking changes and public API

I'd like to discuss our version schema and the way we move forward with breaking changes. In it's current state we have a stable release but we frequently want to introduce minor interface breaking changes which would, according to semantic versioning, require us to increase the major version. While I'm not opposed to increasing our version, I'd like to signal users when they actually should expect something to break, not when, technically speaking, a minor part of the API changes which shouldn't have an impact on users. We have a lot of public API atm which I wouldn't recommend for external usage (e.g. classes like DatasetBuilder, SchemaWrapper, etc.)

We currently have two versioning schemas:

  1. The specification versioning via the metadata_version
  2. The package version to signal breaking python API changes

I believe we need a better way to protect us from too much exposure as well as to protect users from unnecessary breakage.

I can currently come up with a few ways to handle this issue:

  1. Don't change anything. Stick strictly to semver and increase the versions accordingly. Try to keep breaking releases to a minimum (e.g. once a quarter or less often)
  2. Introduce api submodule for our modules which will be clearly flagged as the part of the external API where we guarantee no breakage between major releases. I wouldn't introduce this for the io submodule, though.
  3. Change versioning schema to something like https://calver.org (open for other suggestions. this is just one which is rather common) and clearly document changes if something breaks

`load_dataframes` in `io_components.metapartition.py` does not raise if table specified in `columns` argument doesn't exist

Problem description

When using io.iter.read_dataset_as_dataframes__iterator as in the MWE below, instead of raising that the specified table table2_typo doesn't exist, the entire table2 is loaded. If the user specifies columns, it should not silently load the entire table.
(see https://github.com/JDASoftwareGroup/kartothek/blob/master/kartothek/io_components/metapartition.py#L662)
The same happens, if one doesn't specify tables.

Example code

from functools import partial
from storefact import get_store_from_url
from tempfile import TemporaryDirectory
dataset_dir = TemporaryDirectory()
store_factory = partial(get_store_from_url, f"hfs://{dataset_dir.name}")

from kartothek.core.eager import store_dataframes_as_dataset
from kartothek.io.iter import read_dataset_as_dataframes__iterator

df1 = pd.DataFrame({b"int_col": [1], b"str_col": ["two"]})
df2 = pd.DataFrame({b"float_col": [1.0], b"whatever_col": [None]})

store_dataframes_as_dataset(
        dfs={"table1":df1, "table2":df2}, 
        store=store_factory, 
        dataset_uuid="dataset_uuid"
    )

iterator = read_dataset_as_dataframes__iterator(
        dataset_uuid="dataset_uuid",
        store=store_factory, 
        tables=["table1", "table2"],
        columns={
            "table1": ["int_col", "str_col"],
            "table2_typo": ["float_col"]
            }
    )

dfs = next(iterator)
assert not dfs["table1"].empty
assert "whatever_col" in dfs["table2"].columns

Used versions

alabaster==0.7.12
appnope==0.1.0
aspy.yaml==1.3.0
astroid==2.2.5
asv==0.4.1
atomicwrites==1.3.0
attrs==19.1.0
Babel==2.7.0
backcall==0.1.0
certifi==2019.6.16
cfgv==2.0.1
chardet==3.0.4
Click==7.0
cloudpickle==1.2.1
coverage==4.5.4
dask==2.2.0
decorator==4.4.0
distributed==2.2.0
docutils==0.15.2
entrypoints==0.3
flake8==3.7.8
flake8-mutable==1.2.0
fsspec==0.4.1
HeapDict==1.0.0
hypothesis==4.32.3
identify==1.4.5
idna==2.8
imagesize==1.1.0
importlib-metadata==0.19
importlib-resources==1.0.2
ipython==7.7.0
ipython-genutils==0.2.0
isort==4.3.21
jedi==0.14.1
Jinja2==2.10.1
-e git+https://github.com/SvoONs/kartothek.git@4f6959d1562b27fd798ce28768dbec4e8bdd4a4a#egg=kartothek
lazy-object-proxy==1.4.1
locket==0.2.0
MarkupSafe==1.1.1
mccabe==0.6.1
more-itertools==7.2.0
msgpack==0.6.1
nodeenv==1.3.3
numpy==1.17.0
packaging==19.1
pandas==0.25.0
parso==0.5.1
partd==1.0.0
pexpect==4.7.0
pickleshare==0.7.5
pluggy==0.12.0
pre-commit==1.18.0
prompt-toolkit==2.0.9
psutil==5.6.3
ptyprocess==0.6.0
py==1.8.0
pyarrow==0.13.0
pycodestyle==2.5.0
pyflakes==2.1.1
Pygments==2.4.2
pylint==2.3.1
pyparsing==2.4.2
pytest==5.0.1
pytest-cov==2.7.1
pytest-flake8==1.0.4
pytest-mock==1.10.4
python-dateutil==2.8.0
pytz==2019.2
PyYAML==5.1.2
requests==2.22.0
setuptools-scm==3.3.3
simplejson==3.16.0
simplekv==0.13.0
six==1.12.0
snowballstemmer==1.9.0
sortedcontainers==2.1.0
Sphinx==2.1.2
sphinx-rtd-theme==0.4.3
sphinxcontrib-applehelp==1.0.1
sphinxcontrib-devhelp==1.0.1
sphinxcontrib-htmlhelp==1.0.2
sphinxcontrib-jsmath==1.0.1
sphinxcontrib-qthelp==1.0.2
sphinxcontrib-serializinghtml==1.1.3
storefact==0.9.0
tblib==1.4.0
toml==0.10.0
toolz==0.10.0
tornado==6.0.3
traitlets==4.3.2
typed-ast==1.3.4
uritools==2.2.0
urllib3==1.25.3
virtualenv==16.7.2
wcwidth==0.1.7
wrapt==1.11.2
zict==1.0.0
zipp==0.5.2
zstandard==0.11.1

`update_dataset_from_ddf` does not accept empty dataframe

Problem description

When passing an empty ddf to update_dataset_from_ddf, expected behavior would be that this behaves the same, regardless of the setting of the shuffle parameter and gives an error that storing an empty dataset is not possible.. Instead, this causes an obscure exception deep down in numpy with shuffle=True:

File "/mnt/mesos/sandbox/venv/lib/python3.6/site-packages/kartothek/io/dask/dataframe.py", line 213, in update_dataset_from_ddf
    sort_partitions_by=sort_partitions_by,
  File "/mnt/mesos/sandbox/venv/lib/python3.6/site-packages/kartothek/io/dask/_update.py", line 31, in _update_dask_partitions_shuffle
    np.arange(ddf.npartitions), min(ddf.npartitions, num_buckets)
  File "/mnt/mesos/sandbox/venv/lib/python3.6/site-packages/numpy/lib/shape_base.py", line 754, in array_split
    raise ValueError('number sections must be larger than 0.')
ValueError: number sections must be larger than 0.```

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.