Giter Site home page Giter Site logo

fugue-project / fugue Goto Github PK

View Code? Open in Web Editor NEW
1.9K 23.0 94.0 6.47 MB

A unified interface for distributed computing. Fugue executes SQL, Python, Pandas, and Polars code on Spark, Dask and Ray without any rewrites.

Home Page: https://fugue-tutorials.readthedocs.io/

License: Apache License 2.0

Makefile 0.28% Python 98.31% JavaScript 0.25% Jupyter Notebook 1.14% Shell 0.02%
spark dask data-practitioners machine-learning distributed-systems distributed-computing distributed sql pandas

fugue's Issues

[FEATURE] csv infer schema consistency

Currently loading csv on different engines have different behavior on inferschema, spark doesn't do it by default, but pandas does. We should have a common switch like header to control this with a default value.

[FEATURE] Output transformer

Is your feature request related to a problem? Please describe.
NA

Describe the solution you'd like
Currently Fugue transformers must return something. Sometimes we just want to distributedly process something and then stop. For example, for a big dataframe we can partition by certain keys, then we can use a transformer to serialize and save each partition (in its own way).

For the current design, we need to transform and persist to trigger this execution. It works fine but concept wise, it's a hack. We need to have a clear definition of this as a new type of extension, maybe OutputTransformer?

And we need to add programming and sql interface, and interfaceless for it. Also update the doc.

Describe alternatives you've considered
NA

Additional context
NA

[FEATURE] Fugueless transform and sql

Make transform and sql utility functions so that for simple (one step) use cases, users only need to call the function, no need to start a FugueWorkflow.

[FEATURE] UNION on transforms

Is your feature request related to a problem? Please describe.
No

Describe the solution you'd like

CREATE DATA [[0]] SCHEMA a:int
TRANSFORM USING t1 UNION TRANSFORM USING t2
PRINT

Describe alternatives you've considered
This works now

CREATE DATA [[0]] SCHEMA a:int
SELECT * FROM (TRANSFORM USING t1) UNION SELECT * FROM (TRANSFORM USING t2)
PRINT

Additional context
INTERSECT and EXCEPT should also be implemented

[BUG] There are some tests failing in Windows

Minimal Code To Reproduce

python3 -bb -m pytest tests/fugue

Describe the bug
On my system, the tests produced the following error:

FAILED tests/fugue/dataframe/test_utils.py::test_serialize_df - fs.errors.InvalidCharsInPath: path 'C:\Users\Kevin\AppData\Local\Temp\pytest-of-Kevin\pytest-5\test_serialize_df0\1.pkl' contains invalid characters
FAILED tests/fugue/execution/test_naive_execution_engine.py::NativeExecutionEngineTests::test_io - ValueError: invalid path C:\Users\Kevin\AppData\Local\Temp\pytest-of-Kevin\pytest-5\test_io0\a
FAILED tests/fugue/execution/test_naive_execution_engine.py::NativeExecutionEngineBuiltInTests::test_io - ValueError: invalid path C:\Users\Kevin\AppData\Local\Temp\pytest-of-Kevin\pytest-5\test_io1\a
FAILED tests/fugue/utils/test_io.py::test_parquet_io - ValueError: invalid path C:\Users\Kevin\AppData\Local\Temp\pytest-of-Kevin\pytest-5\test_parquet_io0\a.parquet
FAILED tests/fugue/utils/test_io.py::test_csv_io - ValueError: invalid path C:\Users\Kevin\AppData\Local\Temp\pytest-of-Kevin\pytest-5\test_csv_io0\a.csv
FAILED tests/fugue/utils/test_io.py::test_json - ValueError: invalid path C:\Users\Kevin\AppData\Local\Temp\pytest-of-Kevin\pytest-5\test_json0\a.json

Expected behavior
Tests should work despite the OS

Environment (please complete the following information):

  • Python version: 3.6.9
  • OS: Windows

[BUG] file.latest.parquet is not recognized

Minimal Code To Reproduce

NA

Describe the bug
The problem is here

self.suffix in _FORMAT_MAP,

it takes .latest.parquet as the suffix so can't accept it

Expected behavior
it should accept it

Environment (please complete the following information):

  • Backend: pandas/dask/ray?
  • Backend version:
  • Python version: 3.6.9
  • OS: linux/windows linux

[FEATURE] Passing dataframes cross workflow

Is your feature request related to a problem? Please describe.
NA

Describe the solution you'd like

dag1 = FugueWorkflow()
df = dag1.df([[0]],"a:int")
df.yield_as("output")
dag1.run()

dag2 = FugueWorkflow()
df = dag2.df(dag1["output"])
df.show()
dag2.run()

To pass dataframe cross workflow, the dataframe must be stored in persistent storage (save to disk). So dag1["output"] should just be a reference (file path). Notice that, within the same workflow, yield may not be necessary

for Fugue SQL

dag1 = FugueSQLWorkflow()
dag1("""
output = CREATE [[0]] SCHEMA a:int
YIELD output
# or YIELD output AS out
""")
dag1.run()

dag2 = FugueSQLWorkflow()
dag2("""
PRINT df
""", df = dag1["output"])
dag2.run()

Describe alternatives you've considered
NA

Additional context
NA

[FEATURE] presort should also take array as input

Is your feature request related to a problem? Please describe.
NA

Describe the solution you'd like

def _parse_presort_exp( # noqa: C901

Currently, presort can only be string expression, but programming should also be able to use arrays

"a,b" == ["a", "b"] == [("a", True),("b", True)]
"a desc,b" == [("a", False), "b"] == [("a", False),("b", True)]

Describe alternatives you've considered
NA

Additional context
NA

[BUG] datetime column (pd.DataFrame) returned in Transformer is causing spark error

Minimal Code To Reproduce

import pandas as pd

# schema: a:datetime
def t(sdf:pd.DataFrame) -> pd.DataFrame:
    sdf["a"]=pd.to_datetime(sdf["a"])
    return sdf

with FugueSQLWorkflow(SparkExecutionEngine()) as dag:
    dag.df([["2020-01-01"]], "a:str").transform(t).show()

Describe the bug

TypeError: field a: TimestampType can not accept object Timestamp('2020-01-01 00:00:00') in type <class 'pandas._libs.tslibs.timestamps.Timestamp'>

Expected behavior
This should work.
Should add datetime tests into general execution engine test suites.

Environment (please complete the following information):

  • Backend: pandas/dask/ray? spark
  • Backend version: 3
  • Python version: 3.6.9
  • OS: linux/windows linux

[FEATURE] Fillna as builtin

Is your feature request related to a problem? Please describe.
We should include a fillna method in the execution engine.

Describe the solution you'd like
It just needs to be simple like the Spark implementation. No need for forward fill and backfill.

Additional context
It will have a very similar implementation to the existing dropna method in the execution engine.

[FEATURE] sample table

pandas dataframe is the one we try to follow

df.sample(n, frac, replacement=False, random_seed=None)

[FEATURE] Make sure programming interface join follows SQL join convention

Is your feature request related to a problem? Please describe.
No

Describe the solution you'd like
Pandas join and SQL join have different treatment on NULLs. (SQL does not match nulls). We should make sure the join behavior follows the SQL join convention so it's consistent with Fugue SQL behavior

Describe alternatives you've considered
Make this configurable and default to SQL behavior?

Additional context
NA

[FEATURE] Better support for class member as extensions

Is your feature request related to a problem? Please describe.
The piece of code describes the problem

import pandas as pd
from fugue_sql import FugueSQLWorkflow

class Test(object):
    # schema: *
    def t(self, df:pd.DataFrame) -> pd.DataFrame:
        return df
    
test = Test()

with FugueSQLWorkflow() as dag:
    dag.df([[0]],"a:int").transform(test.t).show() # this works

f = test.t

with FugueSQLWorkflow() as dag:
    dag("""
    CREATE [[0]] SCHEMA a:int
    TRANSFORM USING f   # this works
    PRINT
    """)
    
with FugueSQLWorkflow() as dag:
    dag("""
    CREATE [[0]] SCHEMA a:int
    TRANSFORM USING test.t   # this complains test.t is not a valid transformer
    PRINT
    """)

Describe the solution you'd like
TRANSFORM USING test.t should work, test.t should be recognized
This is not a problem of fugue, but a problem of triad
https://github.com/fugue-project/triad/blob/84d1f01be413251a5a85ce89025c982e1306ed70/triad/utils/convert.py#L155
It needs improvement in triad to be able to recognize this expression.
On Fugue side, additional tests should be added to test class member as extension scenarios

Describe alternatives you've considered
The alternative is the second example this the code above

Additional context
NA

[FEATURE] Dropna functionality

Is your feature request related to a problem? Please describe.
Dropna functionality for fugue.

Describe the solution you'd like
The implementation of pandas is too complex. Look to Spark for a simpler implementation.

Additional context
Apply to spark and dask

[FEATURE] Set operations in programming interface

Is your feature request related to a problem? Please describe.
We can use fugue sql to UNION, INTERSECT and EXECEPT, but it will be convenient to also do them in programming interface

Describe the solution you'd like
In the interfaces we should have something like

with FugueWorkflow() as dag:
    a=dag.df([[0]],"a:int")
    b=dag.df([[1]],"a:int")
    dag.union(a,b).show()
    a.union(b).show()

The two ways of union should give the same result

NOTICE, if there are different behaviors between pandas and sql, Fugue should always follow sql behaviors

Describe alternatives you've considered
NA

Additional context
NA

[FEATURE] alter columns (change column types)

It's common to cast column types without changing other things.

with FugueWorkflow() as dag:
    df = dag.df([[0,1,1.2]],"a:int,b:int,c:double")
    df.alter_columns("b:str,c:int").show()

The above code should change the dataset to [[0,"1",1]]

Alternatively, we can do that in Fugue SQL

CREATE DATA [[0,1,1.2]] SCHEMA a:int,b:int,c:double
ALTER COLUMNS b:int,c:int
SHOW

[FEATURE] WITH statement in FugueSQL

WITH 
    a AS (SELECT 1 AS t),
    b AS (SELECT 2 AS t)
SELECT * FROM a UNION SELECT * FROM b

WITH statement is an unsuccessful attempt for SQL to become a language, Fugue SQL has a much better solution for value assignment and reuse. However, we should try to be compatible with the standard SQL on this.

So the compromise will be, if users using WITH statement, this entire WITH and SELECT statement should directly get data from the sql engine (external data source), and can't consume any previous dataframes.

For example, this is ok, because it doesn't consume dataframes

WITH 
    a AS (SELECT 1 AS t),
    b AS (SELECT 2 AS t)
SELECT * FROM a UNION SELECT * FROM b

this is also ok, because the data source is from hive:

WITH 
    a AS (SELECT * FROM hive.table1),
    b AS (SELECT * FROM hive.table2)
SELECT * FROM a UNION SELECT * FROM b

this is also ok, because the data source is from the specified custom sql engine:

CONNECT CustomSQLEngine WITH 
    a AS (SELECT * FROM t1),
    b AS (SELECT * FROM 22)
SELECT * FROM a UNION SELECT * FROM b

this is not ok, because it tries to consume t1 and t2 generated in the previous steps

t1 = CREATE [[0]] SCHEMA a:int
t2 = CREATE [[1]] SCHEMA a:int

WITH 
    a AS (SELECT * FROM t1),
    b AS (SELECT * FROM 22)
SELECT * FROM a UNION SELECT * FROM b

[FEATURE] Make sure partition by apply not removing null keys

Is your feature request related to a problem? Please describe.
No

Describe the solution you'd like
Pandas group by apply will skip groups whose keys have nulls. But this shouldn't happen in Fugue. We need to make sure for any execution engine, keys with nulls are always ok.

Describe alternatives you've considered
No, the pandas default behavior should not be an option of Fugue.

Additional context
NA

[BUG] LIKE '{%' conflicts with jinja template

Minimal Code To Reproduce

CREATE [["x"]] SCHEMA a:str
SELECT * WHERE a LIKE '{%'
PRINT

Describe the bug
Jinja thinks {% is a partial template sign, a syntax error

Expected behavior
It should work

Environment (please complete the following information):

  • Backend: pandas/dask/ray? spark
  • Backend version: spark 3
  • Python version: 3.6.9
  • OS: linux/windows linux

[BUG] SparkExecutionEngine can't accept pd.DataFrame in to_df

Minimal Code To Reproduce

import pandas as pd

pdf = pd.DataFrame([["2020-01-01"]], columns=["a"])

with FugueSQLWorkflow(SparkExecutionEngine()) as dag:
    dag.df(pdf).show()

Describe the bug

adf = ArrowDataFrame(df, to_schema(schema))

to_schema can't take none

Expected behavior
It should work

Environment (please complete the following information):

  • Backend: pandas/dask/ray? spark
  • Backend version: 3
  • Python version: 1.0.5
  • OS: linux/windows linux

[FEATURE] Print hook

Print in notebook and in console should be in different ways. This has nothing to do with execution engine or what dataframe we use. So the idea is to set a global hook when in notebook (to use display for pretty print).

[FEATURE] Compile-time schema inference

# input_has: a
# schema: *
def tr(df:pd.DataFrame) -> pd.DataFrame:
    return df

dag = FugueWorkflow()
dag.df([[0]],"b:int").transform(tr).persist() # this step will not fail because input_has is runtime validation
dag.run() # this step will fail, but if the dag is complicated, you have to wait for a long time

In certain cases like above, we are able to infer schemas even when constructing the dag. This can bring some schema validations to compile time, which can be very nice.

Notice that, if extension implementing interfaces, the schema may not be able to infer during compile time, because the system can't know how long it takes to call get_output_schema in the interface, so it can't call it at compile time. Unless, we make some change to the extension interfaces to be compile time schema inference feasible... I am not clear what is a good way to do that

[FEATURE] DROP COLUMNS and DROP NA in Fugue SQL

Is your feature request related to a problem? Please describe.
NA

Describe the solution you'd like
Add DROP COLUMNS to FugueSQL

Describe alternatives you've considered
NA

Additional context
NA

[BUG] Save to parquet throws error on NativeExecutionEngine with pandas < 1.0

Minimal Code To Reproduce

from fugue import FugueWorkflow

with FugueWorkflow() as dag:
    dag.df([[0]],"a:int").save("/tmp/1.parquet")

Describe the bug
For pandas>=1, it works fine, for pandas<1 it complains the schema argument is duplicated

Expected behavior
It should work for pandas < 1

Environment (please complete the following information):

  • Backend: pandas
  • Backend version: 0.25.0
  • Python version: 3.6.9
  • OS: linux/windows linux

[BUG] PandasDataFrame can't identify datetime columns correctly

Minimal Code To Reproduce

from fugue import PandasDataFrame
import pandas as pd

df = pd.DataFrame([["2020-01-01"]],columns=["d"])
df["d"]=pd.to_datetime(df["d"])
df1 = PandasDataFrame(df)
df2 = PandasDataFrame(df, "d:date")
d1 = list(df1.as_array_iterable(type_safe=True))
d2 = list(df2.as_array_iterable(type_safe=True))
print(df1.schema)
print(df2.schema)
print(type(d1[0][0]))
print(type(d2[0][0]))

Describe the bug
timestamp(ns) is not converted to datetime type

Expected behavior
df1.schema shoudl be d:datetime

Environment (please complete the following information):

  • Backend: pandas/dask/ray? 1.0.5
  • Backend version:
  • Python version: 3.6.9
  • OS: linux/windows linux

[FEATURE] Fugue macro prototyping

Something like this

@fugue_task
def task0(dag:FugueWorkflow, d:str):
    dag.load(d+"1.parquet").transform(t0).yield_as("r")

@fugue_task
def task1(dag:FugueSQLWorkflow, a: DataFrame, b:DataFrame, c:int) -> None:
    dag("""
    x=TRANSFORM a USING t1(c={{c}})
    y=TRANSFORM b USING t2
    result = SELECT x.*, y.pp 
        FROM x INNER JOIN y ON x.key = y.key
        YIELD
    """)

@fugue_task
def task2(dag:FugueSQLWorkflow, a: DataFrame, b:DataFrame, c:int) -> None:
    dag("""
    x=TRANSFORM a USING t3 YIELD
    y=TRANSFORM b USING t2 YIELD
    """)

@fugue_task
def save(dag:FugueWorkflow, r:DataFrame, path:str) -> None:
    dag.df(r).save(path)


@fugue_macro_workflow
def demo(fmw:FugueMacroWorkflow, a:int, b:int):
    t0 = fmw(task0(d="abc"))
    t1 = fmw(task0(d="def"))
    t2 = task1(t0.r, t1.r, c=a)
    t3 = task2(t0.r, t2.result, c=b)
    save(t2.result,path="1")
    save(t3.x,path="1")
    save(t3.y,path="1")

demo.compile(NativeMacroCompiler()).run(a=1,b=2)

[FEATURE] Deterministic checkpoint and unification of checkpoints

Checkpoint is for caching, there should be 3 types of checkpoints

  • weak checkpoint: it should be similar to spark persist. It is good enough for most use cases, it can avoid duplicated compute of a dataframe. But it may not break up the compute dependency (this is a tricky statement), for example, in spark, if the compute graph is very large, a persist can cause stack overflow.
  • strong checkpoint: saving the dataframe to a file and load back. The benefit of doing this is to guarantee the compute graph is broken up at this point, so even with very large compute graph, every strong checkpoint is a fresh new start.
  • deterministic checkpoint: similar to strong checkpoint, but this checkpoint file is not remove after each complete execution, and the file path is deterministic, so if you run the same thing for a second time, it could start from the checkpoint, which can save a lot of time.

All of these checkpoints have lazy/eager mode (all default to eager)

  • lazy means the first usage of this dataframe triggers the caching to happen, other usage will use the cached dataframe
  • eager means the dataframe caching happens before any usage.

Eager mode is more intuitive, and most cases should use eager mode.
Currently, for strong and deterministic checkpoint, they are always eager checkpoints, so it doesn't matter if you set lazy to true or false

Notice:

  • weak checkpoint is the one to use in most cases
  • weak and strong checkpoints are for single executions, there is no memory cross execution
  • deterministic checkpoint is for cross execution scenario. Normally it should NEVER be used in production, it can be powerful for development
  • currently, deterministic checkpoint implementation is not perfect. If there are non-lazy previous steps, they will still run and take time.

[FEATURE] extension input validation

Is your feature request related to a problem? Please describe.
NA

Describe the solution you'd like
We need certain level of extension validation, some can be done at workflow compile time and some can be done in runtime, for example

# input has: a,b:str,c,d:int
# prepartition has: a,b
# presort has: c,d desc
# schema: *
def dummy_transformer(df:pd.DataFrame) -> pd.DataFrame:
      return df

Decorator and interfaces should also support this type validation. We should start implementing from the interface level.

input has: can only be verified in runtime, because the schemas can be dynamic. But other validations can be done during the dag construction, it can save significant time to find problems, also it is very expressive.

Describe alternatives you've considered
NA

Additional context
NA

[FEATURE] Persist to file

Is your feature request related to a problem? Please describe.
NA

Describe the solution you'd like
Currently, Fugue can only persist to memory. And with Spark engine, Fugue is calling spark's persist + count (eager in memory checkpoint).

However, this is not good enough

  • Spark persist is different from spark checkpoint https://stackoverflow.com/questions/35127720/what-is-the-difference-between-spark-checkpoint-and-persist-to-a-disk
  • Spark persist does not break the lineage, although persist+count ensures the persisted dataframe is not computed again, the lineage still exists, with long execution graph, spark will have overflow issue (even you can dedup the compute). This is a problem of spark itself, but we need to deal with it on Fugue level
  • Persist to file in spark is equivalent to checkpoint, however, I am not clear if we should use spark implementation or we should create our own. With our own implementation we can bring better consistency cross engines, and the implementation isn't very complicated.

Describe alternatives you've considered
NA

Additional context
NA

[FEATURE] Save and load to use

Is your feature request related to a problem? Please describe.
NA

Describe the solution you'd like
The idea is very similar to checkpoint, the difference is that where to save the file is specified by user. So that we can use it as a checkpoint, but we also explicitly saved the dataframe to a place we want so we can use it later on

Describe alternatives you've considered
NA

Additional context
NA

[FEATURE] Add more general unit tests for datetimes

Is your feature request related to a problem? Please describe.
#49
#50

Describe the solution you'd like
We need to add more datetime related tests to test suites to make sure they work consistently.

Describe alternatives you've considered
NA

Additional context
NA

[BUG] SparkExecutionEngine can't take string as datetime inside dataframe

Minimal Code To Reproduce

from fugue_sql import FugueSQLWorkflow

with FugueSQLWorkflow(SparkExecutionEngine()) as dag:
    dag("""
    CREATE [["2020-01-01"]] SCHEMA a:datetime
    PRINT
    """)

Describe the bug

adf = ArrowDataFrame(df, to_schema(schema))

If directly using ArrowDataFrame, arrow can't do the smart conversion. We need a better way to handle this.

Expected behavior
For any execution engine, string as datetime should at least work for FugueSQL

Environment (please complete the following information):

  • Backend: pandas/dask/ray? spark
  • Backend version: 3
  • Python version: 3.6.9
  • OS: linux/windows linux

[FEATURE] Specify SQLEngine inside Fugue SQL

We can change SQLEngine programming interface when running a SQL. But this is not implemented inside Fugue SQL

This should work

[SqliteEngine] SELECT * FROM some_table

[<engine> (params)] should be the expression to specify sql engine inside Fugue SQL

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.