Comments (12)
Which tasks happens in which process?
- Select columns (main) <-- Implemented 2022/03/14 as
select
__getitem__
- Concatenate tables (main) <-- Implemented 2022/03/13 as
+
,+=
&stack
- Copy tables (main) <--- Implemented 2022/03/13 as
t.copy()
- create index (mpPool)
- Table.from_index(source_table, index) (mpPool)
- filter rows (mpPool) (create index: W, create tasks for datablocks: TM, exec.task: W)
- sort (mpPool) (create sort index, then tasks, then Table.from_index)
- groupby (mpPool) (create histogram: W, create merge task: TM, merge histogram: W)
- join (mpPool)
- data import (mpPool) <--- Implemented 2022/03/29
- data load (main) <--- Implemented 2022/03/29
- apply function (mpPool)
- Save to hdf5 (mpPool)
from tablite.
I'm starting the branch master --> hdf5.
The complexity will drop a lot by doing the following:
- StoredList and Common column vanish. Thereby the simplest data structure becomes a table with 1 column.
- The tables use the IO module for creating in-memory tables with HDF5 as the backend. This means that drop to disk only requires to replace the IO module with a python filehandle.
- The metadata of the table is stored directly in the HDF5 file.
- As IObytes is automatically closed when python garbage collects, unclosed files can't hang around. thereby
atexit
doesn't need to do anything. I only need to adddef __del__(self): self._h5f.close()
from tablite.
I have mapped out all the requirements to continue the hdf5 branch in this notebook and think I've got it all covered.
Datatypes = check.
Performance = check.
from tablite.
from tablite.
Some notes:
-
Tablite currently replicates the tables the naive way. This is very wasteful as a adding a column {4} to create a new table recreates the columns from the source {1,2,3} and adds the new column {4}.
-
Another issue is that tablite doesn't support multiprocessing. Everything happens in python's single proc.
To overcome these issues, I'd like to implement a TaskManager
that contains a shared_memory that contains all columns.
As this "global name space" is made for multiprocessing, the the TaskManager (main-process) can delegate tasks to a multiprocessing pool where n-1
using messages containing no more than:
- address of the shared memory block
- task description (join, filter, ..., custom operation)
- shared memory address space for the result.
The work can be slices in various way. As we expect each operation to be atomic and incremental the work can be split by cpu_count
. Hereby filtering N million rows of data could be divided evenly amongst P processors.
Data Import
We can load data using Tablite's filereaders and load CSV data in chunks (already implemented). Each worker can load a different chunk.
Concatenate <--- Implemented 2022/03/13
Concatenation of tables of identical datatypes is trivial in as far as the TaskManager maintains the global namespace of columns
tbl_1 = {'A': ref_1, 'B': ref_2} # columns 1 & 2
tbl_2 = {'A': ref_3, 'B': ref_4} # columns 3 & 4
tbl_4 = tbl_1 + tbl_2 == {'A': [ref_1, ref_3], 'B': [ref_2, ref_4]} # strict order of references.
tbl_4.columns == ['A','B'] # strict order of headings.
Filter <--- Implemented 2022/03/13 as Table.__getitem__(...)
If the new "column" merely contains the indices of the filtered result, the output of the filtering process becomes a simple shared array booleans: True/False with reference to the columns from the global namespace. This is well aligned to best practices from numpy where booleans masks are used.
tbl_3 = {1,3,4} & index
To compute a filter that uses tbl_3
, the new bitarray is merely using subject to tbl_3
's index.
Sort
Sort requires a sortation index. The simplest way is to do this in a single proc.
Things that look the same have the same index.
Groupby & Pivot table.
Multikey groupby can happen in 2 steps:
- Create a sort index (int32, single proc)
- Split the sort index into blocks of work.
- As the keys are unique, write conflicts are impossible, so all procs can update the shared array concurrently (if this doesn't work, then create aggregate columns for each proc)
See numpy-groupies for low level implementations that utilise numpy, numba_jit, etc. for acceleration.
NB> The pivot table is best implemented as a sparse array and interpret upon demand. Most pivot tables have data in less than 25% of the cells.
Join
The join operation can happen in 3 steps:
- Compute a histogram of keys using groupby
- Compute the new table size by calculating the sum product of keypairs (this will differ between inner, left and outer join)
- Compute the pair of indices for the new output (2 x
int32
).
The pair of indices can now be looked up on demand.
Exceeding memory limits
For the TaskManager to assign work to the Workers, a topological sort is required to assure order of execution between tasks.
Even in ".ipynb notebook" mode where the user may execute tasks out of order, the TaskManager must detect dependencies and raise exception at the level of instructions. Raising during run in a remote processor is not adequate.
If the TaskManager detects that the memory footprint exceeds 95% of the available memory, it can drop memory blocks to disk.
For this the TaskMap becomes beneficial: The items that were computed a long time ago (in CPU time) but aren't involved in any tasks can be dropped to disk. Intuitively it seems valid to let the TaskManager use the Least Recently Used (LRU).
If a task by itself appears to exceed memory footprint (for example materializing a table with outer join) the LRU-caching approach will drop completed parts of the outer-join to disk as soon as they're passive.
Note that multiprocessing.shared_memory(name=...,create=True,size=...)
:
- can outlive the process that created them (good)
- can be destroyed using
unlink()
(very good)
See example on python.org
Garbage collection.
when each table is deleted, we can override __del__
with an update to the TaskManager so it's reference count is maintained. It will then be up to the TaskManager to determine whether the columns are deleted from memory or disk.
from tablite.
Multiprocessing - a batch of one.
Assume tbl_7 has the columns 'a','b','c','d','date','hour'
. Using the signature below, it becomes possible to perform parallel computation on simple function.
tbl_7.add_columns_from_function(function, source, index_on=None, func_args=None, func_kwargs=None)
:param: function: a Callable
:param: source: data source, f.x. ['a','b','c']
:param: index_on: columns used to split the source into chunks f.x. ['date', 'hour']
:param: func_args: optional: arguments for the function
:param: func_kwargs: optional: kwargs as a config for the function.
The interpretation of this function takes place in the TaskManager in the following steps:
- check the inputs are valid.
- compute the index for chunks (
index_on
) if it exists - Create tasks for multi-processing
- Apply
multiprocessing.Pool.map
on the chunks. - Collection the results.
Sample task list for batch like operations would look like this:
tasklist = [(tbl_7,0,21,f),(tbl_7,21,56,f),(tbl_7,56,88,f),(tbl_7,88,102,f), ...]
Each task contains (table, slice_start, slice_end, function, batch=True) and implies select the slice from the table and give it to the function as a batch.
To process the chunks in batches of one (e.g. each row), the task list would be:
tasklist = [(tbl_7,0,21,f, False),(tbl_7,21,56,f, False),(tbl_7,56,88,f, False),(tbl_7,88,102,f, False), ...]
The only requirement this approach imposes on the user is to assure that the arguments are acceptable as columns of data, such as:
def my_func(a=[1,2,3,4], b=[5,6,7,8], c=[2,4,6,8]):
result = (
[a[ix]+b[ix]*c[ix] for ix in len(range(a))],
)
return result # returns a single column
And that the result is a tuple of columns. We do not restrict the usage in the signature from including the chunk index, nor from returning multiple columns. Here is another example:
def my_new_func(a=[1,2,3,4], b=[5,6,7,8], c=[2,4,6,8], hour=[6,12,18,0]):
result = (
[a[ix]+b[ix]*c[ix] for ix in len(range(a))],
[h+1 for h in hour]
)
return result # returns a two columns.
from tablite.
POC of memory manager now done.
POC of task manager now done.
Next: Put them together.
from tablite.
Notes on replace missing values
from tablite.
Notes on importing vs loading.
What is the better way of handling imports?
Here is an overview of the file a.zip
:
a.zip/ (hdf5 file to be imported)
b.zip/ (h5 group)
c.csv (h5 group)
col1 (dataset)
col2
....
x.xlsx
sheet1
A (dataset)
B
....
shee2
C
D
c.zip
t.txt.h5
colA
colB
d.csv
col_1 (dataset)
col_2
col_3
I then create the hdf5 file a.zip.h5
as:
a.zip.h5 contents
b.zip : <HDF5 group "/b.zip" (2 members)>
c.csv : <HDF5 group "/b.zip/c.csv" (2 members)>
config : {"import_as": "csv", "newline": "\r\n", "text_qualifier": "\"", "delimiter": ",", "first_row_headers": true, "columns": {"col1": "i8", "col2": "int64"}}
col1 : <HDF5 dataset "col1": shape (6,), type "<i8">
col2 : <HDF5 dataset "col2": shape (6,), type "<i8">
x.xlsx : <HDF5 group "/b.zip/x.xlsx" (2 members)>
sheet1 : <HDF5 group "/b.zip/x.xlsx/sheet1" (2 members)>
A : <HDF5 dataset "A": shape (2,), type "|S6">
B : <HDF5 dataset "B": shape (3,), type "|S8">
sheet2 : <HDF5 group "/b.zip/x.xlsx/sheet2" (2 members)>
C : <HDF5 dataset "C": shape (200,), type "<i8">
D : <HDF5 dataset "D": shape (200,), type "<i8">
c.zip : <HDF5 group "/c.zip" (1 members)>
t.txt : <HDF5 group "/c.zip/t.txt" (1 members)>
logs : <HDF5 dataset "logs": shape (100,), type "<f4">
d.csv : <HDF5 group "/d.csv" (3 members)>
col_1 : <HDF5 dataset "col_1": shape (9,), type "<f4">
col_2 : <HDF5 dataset "col_2": shape (9,), type "<f4">
col_3 : <HDF5 dataset "col_3": shape (9,), type "<f4">
so that the hdf5 reflects the zip files structure.
IF the size
or config
changes, the file is re-imported.
** In retrospect I think this is a bad idea. The fact that this could work, doesn't mean it should. **
CONCLUSION: .txt, .csv, .xlsx and .h5 files will be supported. Users can unzip their files before use.
from tablite.
only show importable compatible formats.
from tablite.
We have a test like this in the test suite:
Table1 = Table()
table1.add_column('A', data=[1, 2, 3])
table1.add_column('B', data=['a', 'b', 'c'])
table2 = table1.copy() # kept for checking.
# Now we change table1:
table1['A', 'B'] = [ [4, 5, 6], [1.1, 2.2, 3.3] ]
# then we check table2 isn't broken.
assert table2['A'] == [1, 2, 3]
assert table2['B'] == ['a', 'b', 'c']
My question: Is this test still valid?
My mind is set on the reality that tables become immutable, but I can handle the deduplication in the memoryManager, so there's no real problem in permitting this. It only removes the constraint that tables should be immutable.
Conclusions A few hours later....
< Implemented. Tables are now mutable and permit updates. These will be quite slow, so the most efficient approach is to do slice updates rather than individual values.
Fastest of course is to create a new column and drop the old.
from tablite.
implemented in commit #4844bc87
from tablite.
Related Issues (20)
- Join (reindexing) fails when table spans multiple pages HOT 2
- Documentation is out of sync HOT 1
- Determine method to handle out-of-memory for large joins. HOT 1
- Proposed format specification HOT 1
- multi proc groupby HOT 1
- multi proc join HOT 3
- Add warning in add_rows that is the slowest method HOT 1
- Deprecating support for python 3.8 in favor of type hints throughout the code HOT 1
- Columns with empty names HOT 2
- Table.load very slow with dtype('O') HOT 5
- Join multiprocessing take 2.
- Change default tz info to utc
- np.save will be deprecated in numpy 2.1
- Addition of match operator HOT 5
- HDF5 file size never decreases + concurrent interpreters can overwrite each others files. HOT 14
- sorting problem with datetime dt columns HOT 1
- Inconsistent row slice HOT 3
- Slow import of files with text escape HOT 16
- statistics() fails on time column HOT 2
- my first issue
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from tablite.