Giter Site home page Giter Site logo

More disk support? about tablite HOT 12 CLOSED

root-11 avatar root-11 commented on June 20, 2024
More disk support?

from tablite.

Comments (12)

root-11 avatar root-11 commented on June 20, 2024 1

Which tasks happens in which process?

  1. Select columns (main) <-- Implemented 2022/03/14 as select __getitem__
  2. Concatenate tables (main) <-- Implemented 2022/03/13 as +, += & stack
  3. Copy tables (main) <--- Implemented 2022/03/13 as t.copy()
  4. create index (mpPool)
  5. Table.from_index(source_table, index) (mpPool)
  6. filter rows (mpPool) (create index: W, create tasks for datablocks: TM, exec.task: W)
  7. sort (mpPool) (create sort index, then tasks, then Table.from_index)
  8. groupby (mpPool) (create histogram: W, create merge task: TM, merge histogram: W)
  9. join (mpPool)
  10. data import (mpPool) <--- Implemented 2022/03/29
  11. data load (main) <--- Implemented 2022/03/29
  12. apply function (mpPool)
  13. Save to hdf5 (mpPool)

from tablite.

root-11 avatar root-11 commented on June 20, 2024

I'm starting the branch master --> hdf5.

The complexity will drop a lot by doing the following:

  1. StoredList and Common column vanish. Thereby the simplest data structure becomes a table with 1 column.
  2. The tables use the IO module for creating in-memory tables with HDF5 as the backend. This means that drop to disk only requires to replace the IO module with a python filehandle.
  3. The metadata of the table is stored directly in the HDF5 file.
  4. As IObytes is automatically closed when python garbage collects, unclosed files can't hang around. thereby atexit doesn't need to do anything. I only need to add def __del__(self): self._h5f.close()

from tablite.

root-11 avatar root-11 commented on June 20, 2024

I have mapped out all the requirements to continue the hdf5 branch in this notebook and think I've got it all covered.

Datatypes = check.
Performance = check.

from tablite.

root-11 avatar root-11 commented on June 20, 2024

image

from tablite.

root-11 avatar root-11 commented on June 20, 2024

Some notes:

  • Tablite currently replicates the tables the naive way. This is very wasteful as a adding a column {4} to create a new table recreates the columns from the source {1,2,3} and adds the new column {4}.

  • Another issue is that tablite doesn't support multiprocessing. Everything happens in python's single proc.

To overcome these issues, I'd like to implement a TaskManager that contains a shared_memory that contains all columns.

As this "global name space" is made for multiprocessing, the the TaskManager (main-process) can delegate tasks to a multiprocessing pool where n-1 using messages containing no more than:

  • address of the shared memory block
  • task description (join, filter, ..., custom operation)
  • shared memory address space for the result.

The work can be slices in various way. As we expect each operation to be atomic and incremental the work can be split by cpu_count. Hereby filtering N million rows of data could be divided evenly amongst P processors.

Data Import

We can load data using Tablite's filereaders and load CSV data in chunks (already implemented). Each worker can load a different chunk.

Concatenate <--- Implemented 2022/03/13

Concatenation of tables of identical datatypes is trivial in as far as the TaskManager maintains the global namespace of columns

tbl_1 = {'A': ref_1, 'B': ref_2}  # columns 1 & 2
tbl_2 = {'A': ref_3, 'B': ref_4}  # columns 3 & 4
tbl_4 = tbl_1 + tbl_2  == {'A': [ref_1, ref_3], 'B': [ref_2, ref_4]}   # strict order of references.

tbl_4.columns == ['A','B'] # strict order of headings.

Filter <--- Implemented 2022/03/13 as Table.__getitem__(...)

If the new "column" merely contains the indices of the filtered result, the output of the filtering process becomes a simple shared array booleans: True/False with reference to the columns from the global namespace. This is well aligned to best practices from numpy where booleans masks are used.

tbl_3 = {1,3,4} & index

To compute a filter that uses tbl_3, the new bitarray is merely using subject to tbl_3's index.

Sort

Sort requires a sortation index. The simplest way is to do this in a single proc.
Things that look the same have the same index.

Groupby & Pivot table.

Multikey groupby can happen in 2 steps:

  1. Create a sort index (int32, single proc)
  2. Split the sort index into blocks of work.
  3. As the keys are unique, write conflicts are impossible, so all procs can update the shared array concurrently (if this doesn't work, then create aggregate columns for each proc)

See numpy-groupies for low level implementations that utilise numpy, numba_jit, etc. for acceleration.

NB> The pivot table is best implemented as a sparse array and interpret upon demand. Most pivot tables have data in less than 25% of the cells.

Join

The join operation can happen in 3 steps:

  1. Compute a histogram of keys using groupby
  2. Compute the new table size by calculating the sum product of keypairs (this will differ between inner, left and outer join)
  3. Compute the pair of indices for the new output (2 x int32).

The pair of indices can now be looked up on demand.

Exceeding memory limits

For the TaskManager to assign work to the Workers, a topological sort is required to assure order of execution between tasks.
Even in ".ipynb notebook" mode where the user may execute tasks out of order, the TaskManager must detect dependencies and raise exception at the level of instructions. Raising during run in a remote processor is not adequate.

If the TaskManager detects that the memory footprint exceeds 95% of the available memory, it can drop memory blocks to disk.

For this the TaskMap becomes beneficial: The items that were computed a long time ago (in CPU time) but aren't involved in any tasks can be dropped to disk. Intuitively it seems valid to let the TaskManager use the Least Recently Used (LRU).

If a task by itself appears to exceed memory footprint (for example materializing a table with outer join) the LRU-caching approach will drop completed parts of the outer-join to disk as soon as they're passive.

Note that multiprocessing.shared_memory(name=...,create=True,size=...):

  • can outlive the process that created them (good)
  • can be destroyed using unlink() (very good)

See example on python.org

Garbage collection.

when each table is deleted, we can override __del__ with an update to the TaskManager so it's reference count is maintained. It will then be up to the TaskManager to determine whether the columns are deleted from memory or disk.

from tablite.

root-11 avatar root-11 commented on June 20, 2024

Multiprocessing - a batch of one.

image

Assume tbl_7 has the columns 'a','b','c','d','date','hour'. Using the signature below, it becomes possible to perform parallel computation on simple function.

tbl_7.add_columns_from_function(function, source, index_on=None, func_args=None, func_kwargs=None)

:param: function: a Callable
:param: source: data source, f.x. ['a','b','c']
:param: index_on: columns used to split the source into chunks f.x. ['date', 'hour']
:param: func_args: optional: arguments for the function
:param: func_kwargs: optional: kwargs as a config for the function.

The interpretation of this function takes place in the TaskManager in the following steps:

  1. check the inputs are valid.
  2. compute the index for chunks (index_on) if it exists
  3. Create tasks for multi-processing
  4. Apply multiprocessing.Pool.map on the chunks.
  5. Collection the results.

Sample task list for batch like operations would look like this:

tasklist = [(tbl_7,0,21,f),(tbl_7,21,56,f),(tbl_7,56,88,f),(tbl_7,88,102,f), ...]   

Each task contains (table, slice_start, slice_end, function, batch=True) and implies select the slice from the table and give it to the function as a batch.

To process the chunks in batches of one (e.g. each row), the task list would be:

tasklist = [(tbl_7,0,21,f, False),(tbl_7,21,56,f, False),(tbl_7,56,88,f, False),(tbl_7,88,102,f, False), ...]   

The only requirement this approach imposes on the user is to assure that the arguments are acceptable as columns of data, such as:

def my_func(a=[1,2,3,4], b=[5,6,7,8], c=[2,4,6,8]):
     result = ( 
           [a[ix]+b[ix]*c[ix] for ix in len(range(a))],
     )
     return result  # returns a single column

And that the result is a tuple of columns. We do not restrict the usage in the signature from including the chunk index, nor from returning multiple columns. Here is another example:

def my_new_func(a=[1,2,3,4], b=[5,6,7,8], c=[2,4,6,8], hour=[6,12,18,0]):
     result = (
           [a[ix]+b[ix]*c[ix] for ix in len(range(a))],
           [h+1 for h in hour]
     )
     return  result  # returns a two columns.

from tablite.

root-11 avatar root-11 commented on June 20, 2024

POC of memory manager now done.
POC of task manager now done.
Next: Put them together.

from tablite.

root-11 avatar root-11 commented on June 20, 2024

Notes on replace missing values
image

from tablite.

root-11 avatar root-11 commented on June 20, 2024

Notes on importing vs loading.
image

What is the better way of handling imports?

Here is an overview of the file a.zip:

a.zip/  (hdf5 file to be imported)
	b.zip/   (h5 group)
		c.csv  (h5 group)
			col1   (dataset)
			col2
			....
		x.xlsx
			sheet1
				A   (dataset)
				B
				....
			shee2
				C
				D   
	c.zip
		t.txt.h5
			colA
			colB
	d.csv
		col_1  (dataset)
		col_2
		col_3

I then create the hdf5 file a.zip.h5 as:

a.zip.h5 contents
 b.zip : <HDF5 group "/b.zip" (2 members)>
   c.csv : <HDF5 group "/b.zip/c.csv" (2 members)>
     config : {"import_as": "csv", "newline": "\r\n", "text_qualifier": "\"", "delimiter": ",", "first_row_headers": true, "columns": {"col1": "i8", "col2": "int64"}}
     col1 : <HDF5 dataset "col1": shape (6,), type "<i8">
     col2 : <HDF5 dataset "col2": shape (6,), type "<i8">
   x.xlsx : <HDF5 group "/b.zip/x.xlsx" (2 members)>
     sheet1 : <HDF5 group "/b.zip/x.xlsx/sheet1" (2 members)>
       A : <HDF5 dataset "A": shape (2,), type "|S6">
       B : <HDF5 dataset "B": shape (3,), type "|S8">
     sheet2 : <HDF5 group "/b.zip/x.xlsx/sheet2" (2 members)>
       C : <HDF5 dataset "C": shape (200,), type "<i8">
       D : <HDF5 dataset "D": shape (200,), type "<i8">
 c.zip : <HDF5 group "/c.zip" (1 members)>
   t.txt : <HDF5 group "/c.zip/t.txt" (1 members)>
     logs : <HDF5 dataset "logs": shape (100,), type "<f4">
 d.csv : <HDF5 group "/d.csv" (3 members)>
   col_1 : <HDF5 dataset "col_1": shape (9,), type "<f4">
   col_2 : <HDF5 dataset "col_2": shape (9,), type "<f4">
   col_3 : <HDF5 dataset "col_3": shape (9,), type "<f4">

so that the hdf5 reflects the zip files structure.

IF the size or config changes, the file is re-imported.

** In retrospect I think this is a bad idea. The fact that this could work, doesn't mean it should. **

CONCLUSION: .txt, .csv, .xlsx and .h5 files will be supported. Users can unzip their files before use.

from tablite.

root-11 avatar root-11 commented on June 20, 2024

only show importable compatible formats.

from tablite.

root-11 avatar root-11 commented on June 20, 2024

We have a test like this in the test suite:

Table1 = Table()
table1.add_column('A', data=[1, 2, 3])
table1.add_column('B', data=['a', 'b', 'c'])

table2 = table1.copy()  # kept for checking.

# Now we change table1:

table1['A', 'B'] = [ [4, 5, 6], [1.1, 2.2, 3.3] ]

# then we check table2 isn't broken.

assert table2['A'] == [1, 2, 3]
assert table2['B'] == ['a', 'b', 'c']

My question: Is this test still valid?
My mind is set on the reality that tables become immutable, but I can handle the deduplication in the memoryManager, so there's no real problem in permitting this. It only removes the constraint that tables should be immutable.

Conclusions A few hours later....

< Implemented. Tables are now mutable and permit updates. These will be quite slow, so the most efficient approach is to do slice updates rather than individual values.

Fastest of course is to create a new column and drop the old.

from tablite.

root-11 avatar root-11 commented on June 20, 2024

implemented in commit #4844bc87

from tablite.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.