Giter Site home page Giter Site logo

fugue-project / fugue Goto Github PK

View Code? Open in Web Editor NEW
1.9K 23.0 94.0 6.47 MB

A unified interface for distributed computing. Fugue executes SQL, Python, Pandas, and Polars code on Spark, Dask and Ray without any rewrites.

Home Page: https://fugue-tutorials.readthedocs.io/

License: Apache License 2.0

Makefile 0.28% Python 98.31% JavaScript 0.25% Jupyter Notebook 1.14% Shell 0.02%
spark dask data-practitioners machine-learning distributed-systems distributed-computing distributed sql pandas

fugue's Introduction

PyPI version PyPI pyversions PyPI license codecov Codacy Badge Downloads

Tutorials API Documentation Chat with us on slack!
Jupyter Book Badge Doc Slack Status

Fugue is a unified interface for distributed computing that lets users execute Python, Pandas, and SQL code on Spark, Dask, and Ray with minimal rewrites.

Fugue is most commonly used for:

  • Parallelizing or scaling existing Python and Pandas code by bringing it to Spark, Dask, or Ray with minimal rewrites.
  • Using FugueSQL to define end-to-end workflows on top of Pandas, Spark, and Dask DataFrames. FugueSQL is an enhanced SQL interface that can invoke Python code.

To see how Fugue compares to other frameworks like dbt, Arrow, Ibis, PySpark Pandas, see the comparisons

The Fugue API is a collection of functions that are capable of running on Pandas, Spark, Dask, and Ray. The simplest way to use Fugue is the transform() function. This lets users parallelize the execution of a single function by bringing it to Spark, Dask, or Ray. In the example below, the map_letter_to_food() function takes in a mapping and applies it on a column. This is just Pandas and Python so far (without Fugue).

import pandas as pd
from typing import Dict

input_df = pd.DataFrame({"id":[0,1,2], "value": (["A", "B", "C"])})
map_dict = {"A": "Apple", "B": "Banana", "C": "Carrot"}

def map_letter_to_food(df: pd.DataFrame, mapping: Dict[str, str]) -> pd.DataFrame:
    df["value"] = df["value"].map(mapping)
    return df

Now, the map_letter_to_food() function is brought to the Spark execution engine by invoking the transform() function of Fugue. The output schema and params are passed to the transform() call. The schema is needed because it's a requirement for distributed frameworks. A schema of "*" below means all input columns are in the output.

from pyspark.sql import SparkSession
from fugue import transform

spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(input_df)

out = transform(sdf,
               map_letter_to_food,
               schema="*",
               params=dict(mapping=map_dict),
               )
# out is a Spark DataFrame
out.show()
+---+------+
| id| value|
+---+------+
|  0| Apple|
|  1|Banana|
|  2|Carrot|
+---+------+
PySpark equivalent of Fugue transform()
from typing import Iterator, Union
from pyspark.sql.types import StructType
from pyspark.sql import DataFrame, SparkSession

spark_session = SparkSession.builder.getOrCreate()

def mapping_wrapper(dfs: Iterator[pd.DataFrame], mapping):
  for df in dfs:
      yield map_letter_to_food(df, mapping)

def run_map_letter_to_food(input_df: Union[DataFrame, pd.DataFrame], mapping):
  # conversion
  if isinstance(input_df, pd.DataFrame):
      sdf = spark_session.createDataFrame(input_df.copy())
  else:
      sdf = input_df.copy()

  schema = StructType(list(sdf.schema.fields))
  return sdf.mapInPandas(lambda dfs: mapping_wrapper(dfs, mapping),
                          schema=schema)

result = run_map_letter_to_food(input_df, map_dict)
result.show()

This syntax is simpler, cleaner, and more maintainable than the PySpark equivalent. At the same time, no edits were made to the original Pandas-based function to bring it to Spark. It is still usable on Pandas DataFrames. Fugue transform() also supports Dask and Ray as execution engines alongside the default Pandas-based engine.

The Fugue API has a broader collection of functions that are also compatible with Spark, Dask, and Ray. For example, we can use load() and save() to create an end-to-end workflow compatible with Spark, Dask, and Ray. For the full list of functions, see the Top Level API

import fugue.api as fa

def run(engine=None):
    with fa.engine_context(engine):
        df = fa.load("/path/to/file.parquet")
        out = fa.transform(df, map_letter_to_food, schema="*")
        fa.save(out, "/path/to/output_file.parquet")

run()                 # runs on Pandas
run(engine="spark")   # runs on Spark
run(engine="dask")    # runs on Dask

All functions underneath the context will run on the specified backend. This makes it easy to toggle between local execution, and distributed execution.

FugueSQL is a SQL-based language capable of expressing end-to-end data workflows on top of Pandas, Spark, and Dask. The map_letter_to_food() function above is used in the SQL expression below. This is how to use a Python-defined function along with the standard SQL SELECT statement.

from fugue.api import fugue_sql
import json

query = """
    SELECT id, value
      FROM input_df
    TRANSFORM USING map_letter_to_food(mapping={{mapping}}) SCHEMA *
    """
map_dict_str = json.dumps(map_dict)

# returns Pandas DataFrame
fugue_sql(query,mapping=map_dict_str)

# returns Spark DataFrame
fugue_sql(query, mapping=map_dict_str, engine="spark")

Installation

Fugue can be installed through pip or conda. For example:

pip install fugue

In order to use Fugue SQL, it is strongly recommended to install the sql extra:

pip install fugue[sql]

It also has the following installation extras:

  • sql: to support Fugue SQL. Without this extra, the non-SQL part still works. Before Fugue 0.9.0, this extra is included in Fugue's core dependency so you don't need to install explicitly. But for 0,9.0+, this becomes required if you want to use Fugue SQL.
  • spark: to support Spark as the ExecutionEngine.
  • dask: to support Dask as the ExecutionEngine.
  • ray: to support Ray as the ExecutionEngine.
  • duckdb: to support DuckDB as the ExecutionEngine, read details.
  • polars: to support Polars DataFrames and extensions using Polars.
  • ibis: to enable Ibis for Fugue workflows, read details.
  • cpp_sql_parser: to enable the CPP antlr parser for Fugue SQL. It can be 50+ times faster than the pure Python parser. For the main Python versions and platforms, there is already pre-built binaries, but for the remaining, it needs a C++ compiler to build on the fly.

For example a common use case is:

pip install "fugue[duckdb,spark]"

Note if you already installed Spark or DuckDB independently, Fugue is able to automatically use them without installing the extras.

The best way to get started with Fugue is to work through the 10 minute tutorials:

For the top level API, see:

The tutorials can also be run in an interactive notebook environment through binder or Docker:

Using binder

Binder

Note it runs slow on binder because the machine on binder isn't powerful enough for a distributed framework such as Spark. Parallel executions can become sequential, so some of the performance comparison examples will not give you the correct numbers.

Using Docker

Alternatively, you should get decent performance by running this Docker image on your own machine:

docker run -p 8888:8888 fugueproject/tutorials:latest

Jupyter Notebook Extension

There is an accompanying notebook extension for FugueSQL that lets users use the %%fsql cell magic. The extension also provides syntax highlighting for FugueSQL cells. It works for both classic notebook and Jupyter Lab. More details can be found in the installation instructions.

FugueSQL gif

Ecosystem

By being an abstraction layer, Fugue can be used with a lot of other open-source projects seamlessly.

Python backends:

FugueSQL backends:

  • Pandas - FugueSQL can run on Pandas
  • Duckdb - in-process SQL OLAP database management
  • dask-sql - SQL interface for Dask
  • SparkSQL
  • BigQuery
  • Trino

Fugue is available as a backend or can integrate with the following projects:

Registered 3rd party extensions (majorly for Fugue SQL) include:

  • Pandas plot - visualize data using matplotlib or plotly
  • Seaborn - visualize data using seaborn
  • WhyLogs - visualize data profiling
  • Vizzu - visualize data using ipyvizzu

Community and Contributing

Feel free to message us on Slack. We also have contributing instructions.

Case Studies

Mentioned Uses

Further Resources

View some of our latest conferences presentations and content. For a more complete list, check the Content page in the tutorials.

Blogs

Conferences

fugue's People

Contributors

aholten avatar ahuang11 avatar alexander-beedie avatar anticorrelator avatar avriiil avatar bitsofinfo avatar crsantiago avatar gityow avatar gliptak avatar goodwanghan avatar jamie256 avatar jaymanalastas avatar kvnkho avatar laurenterreca avatar mfahadakbar avatar nils-braun avatar rbergeron avatar rdmolony avatar synapticarbors avatar veghdev avatar wangchx avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

fugue's Issues

[FEATURE] Fugue macro prototyping

Something like this

@fugue_task
def task0(dag:FugueWorkflow, d:str):
    dag.load(d+"1.parquet").transform(t0).yield_as("r")

@fugue_task
def task1(dag:FugueSQLWorkflow, a: DataFrame, b:DataFrame, c:int) -> None:
    dag("""
    x=TRANSFORM a USING t1(c={{c}})
    y=TRANSFORM b USING t2
    result = SELECT x.*, y.pp 
        FROM x INNER JOIN y ON x.key = y.key
        YIELD
    """)

@fugue_task
def task2(dag:FugueSQLWorkflow, a: DataFrame, b:DataFrame, c:int) -> None:
    dag("""
    x=TRANSFORM a USING t3 YIELD
    y=TRANSFORM b USING t2 YIELD
    """)

@fugue_task
def save(dag:FugueWorkflow, r:DataFrame, path:str) -> None:
    dag.df(r).save(path)


@fugue_macro_workflow
def demo(fmw:FugueMacroWorkflow, a:int, b:int):
    t0 = fmw(task0(d="abc"))
    t1 = fmw(task0(d="def"))
    t2 = task1(t0.r, t1.r, c=a)
    t3 = task2(t0.r, t2.result, c=b)
    save(t2.result,path="1")
    save(t3.x,path="1")
    save(t3.y,path="1")

demo.compile(NativeMacroCompiler()).run(a=1,b=2)

[FEATURE] csv infer schema consistency

Currently loading csv on different engines have different behavior on inferschema, spark doesn't do it by default, but pandas does. We should have a common switch like header to control this with a default value.

[FEATURE] Print hook

Print in notebook and in console should be in different ways. This has nothing to do with execution engine or what dataframe we use. So the idea is to set a global hook when in notebook (to use display for pretty print).

[BUG] SparkExecutionEngine can't accept pd.DataFrame in to_df

Minimal Code To Reproduce

import pandas as pd

pdf = pd.DataFrame([["2020-01-01"]], columns=["a"])

with FugueSQLWorkflow(SparkExecutionEngine()) as dag:
    dag.df(pdf).show()

Describe the bug

adf = ArrowDataFrame(df, to_schema(schema))

to_schema can't take none

Expected behavior
It should work

Environment (please complete the following information):

  • Backend: pandas/dask/ray? spark
  • Backend version: 3
  • Python version: 1.0.5
  • OS: linux/windows linux

[FEATURE] Make sure programming interface join follows SQL join convention

Is your feature request related to a problem? Please describe.
No

Describe the solution you'd like
Pandas join and SQL join have different treatment on NULLs. (SQL does not match nulls). We should make sure the join behavior follows the SQL join convention so it's consistent with Fugue SQL behavior

Describe alternatives you've considered
Make this configurable and default to SQL behavior?

Additional context
NA

[FEATURE] extension input validation

Is your feature request related to a problem? Please describe.
NA

Describe the solution you'd like
We need certain level of extension validation, some can be done at workflow compile time and some can be done in runtime, for example

# input has: a,b:str,c,d:int
# prepartition has: a,b
# presort has: c,d desc
# schema: *
def dummy_transformer(df:pd.DataFrame) -> pd.DataFrame:
      return df

Decorator and interfaces should also support this type validation. We should start implementing from the interface level.

input has: can only be verified in runtime, because the schemas can be dynamic. But other validations can be done during the dag construction, it can save significant time to find problems, also it is very expressive.

Describe alternatives you've considered
NA

Additional context
NA

[FEATURE] sample table

pandas dataframe is the one we try to follow

df.sample(n, frac, replacement=False, random_seed=None)

[FEATURE] Make sure partition by apply not removing null keys

Is your feature request related to a problem? Please describe.
No

Describe the solution you'd like
Pandas group by apply will skip groups whose keys have nulls. But this shouldn't happen in Fugue. We need to make sure for any execution engine, keys with nulls are always ok.

Describe alternatives you've considered
No, the pandas default behavior should not be an option of Fugue.

Additional context
NA

[FEATURE] Add more general unit tests for datetimes

Is your feature request related to a problem? Please describe.
#49
#50

Describe the solution you'd like
We need to add more datetime related tests to test suites to make sure they work consistently.

Describe alternatives you've considered
NA

Additional context
NA

[FEATURE] Passing dataframes cross workflow

Is your feature request related to a problem? Please describe.
NA

Describe the solution you'd like

dag1 = FugueWorkflow()
df = dag1.df([[0]],"a:int")
df.yield_as("output")
dag1.run()

dag2 = FugueWorkflow()
df = dag2.df(dag1["output"])
df.show()
dag2.run()

To pass dataframe cross workflow, the dataframe must be stored in persistent storage (save to disk). So dag1["output"] should just be a reference (file path). Notice that, within the same workflow, yield may not be necessary

for Fugue SQL

dag1 = FugueSQLWorkflow()
dag1("""
output = CREATE [[0]] SCHEMA a:int
YIELD output
# or YIELD output AS out
""")
dag1.run()

dag2 = FugueSQLWorkflow()
dag2("""
PRINT df
""", df = dag1["output"])
dag2.run()

Describe alternatives you've considered
NA

Additional context
NA

[FEATURE] Fillna as builtin

Is your feature request related to a problem? Please describe.
We should include a fillna method in the execution engine.

Describe the solution you'd like
It just needs to be simple like the Spark implementation. No need for forward fill and backfill.

Additional context
It will have a very similar implementation to the existing dropna method in the execution engine.

[FEATURE] Compile-time schema inference

# input_has: a
# schema: *
def tr(df:pd.DataFrame) -> pd.DataFrame:
    return df

dag = FugueWorkflow()
dag.df([[0]],"b:int").transform(tr).persist() # this step will not fail because input_has is runtime validation
dag.run() # this step will fail, but if the dag is complicated, you have to wait for a long time

In certain cases like above, we are able to infer schemas even when constructing the dag. This can bring some schema validations to compile time, which can be very nice.

Notice that, if extension implementing interfaces, the schema may not be able to infer during compile time, because the system can't know how long it takes to call get_output_schema in the interface, so it can't call it at compile time. Unless, we make some change to the extension interfaces to be compile time schema inference feasible... I am not clear what is a good way to do that

[FEATURE] alter columns (change column types)

It's common to cast column types without changing other things.

with FugueWorkflow() as dag:
    df = dag.df([[0,1,1.2]],"a:int,b:int,c:double")
    df.alter_columns("b:str,c:int").show()

The above code should change the dataset to [[0,"1",1]]

Alternatively, we can do that in Fugue SQL

CREATE DATA [[0,1,1.2]] SCHEMA a:int,b:int,c:double
ALTER COLUMNS b:int,c:int
SHOW

[BUG] PandasDataFrame can't identify datetime columns correctly

Minimal Code To Reproduce

from fugue import PandasDataFrame
import pandas as pd

df = pd.DataFrame([["2020-01-01"]],columns=["d"])
df["d"]=pd.to_datetime(df["d"])
df1 = PandasDataFrame(df)
df2 = PandasDataFrame(df, "d:date")
d1 = list(df1.as_array_iterable(type_safe=True))
d2 = list(df2.as_array_iterable(type_safe=True))
print(df1.schema)
print(df2.schema)
print(type(d1[0][0]))
print(type(d2[0][0]))

Describe the bug
timestamp(ns) is not converted to datetime type

Expected behavior
df1.schema shoudl be d:datetime

Environment (please complete the following information):

  • Backend: pandas/dask/ray? 1.0.5
  • Backend version:
  • Python version: 3.6.9
  • OS: linux/windows linux

[BUG] LIKE '{%' conflicts with jinja template

Minimal Code To Reproduce

CREATE [["x"]] SCHEMA a:str
SELECT * WHERE a LIKE '{%'
PRINT

Describe the bug
Jinja thinks {% is a partial template sign, a syntax error

Expected behavior
It should work

Environment (please complete the following information):

  • Backend: pandas/dask/ray? spark
  • Backend version: spark 3
  • Python version: 3.6.9
  • OS: linux/windows linux

[FEATURE] Set operations in programming interface

Is your feature request related to a problem? Please describe.
We can use fugue sql to UNION, INTERSECT and EXECEPT, but it will be convenient to also do them in programming interface

Describe the solution you'd like
In the interfaces we should have something like

with FugueWorkflow() as dag:
    a=dag.df([[0]],"a:int")
    b=dag.df([[1]],"a:int")
    dag.union(a,b).show()
    a.union(b).show()

The two ways of union should give the same result

NOTICE, if there are different behaviors between pandas and sql, Fugue should always follow sql behaviors

Describe alternatives you've considered
NA

Additional context
NA

[FEATURE] UNION on transforms

Is your feature request related to a problem? Please describe.
No

Describe the solution you'd like

CREATE DATA [[0]] SCHEMA a:int
TRANSFORM USING t1 UNION TRANSFORM USING t2
PRINT

Describe alternatives you've considered
This works now

CREATE DATA [[0]] SCHEMA a:int
SELECT * FROM (TRANSFORM USING t1) UNION SELECT * FROM (TRANSFORM USING t2)
PRINT

Additional context
INTERSECT and EXCEPT should also be implemented

[FEATURE] Dropna functionality

Is your feature request related to a problem? Please describe.
Dropna functionality for fugue.

Describe the solution you'd like
The implementation of pandas is too complex. Look to Spark for a simpler implementation.

Additional context
Apply to spark and dask

[FEATURE] Output transformer

Is your feature request related to a problem? Please describe.
NA

Describe the solution you'd like
Currently Fugue transformers must return something. Sometimes we just want to distributedly process something and then stop. For example, for a big dataframe we can partition by certain keys, then we can use a transformer to serialize and save each partition (in its own way).

For the current design, we need to transform and persist to trigger this execution. It works fine but concept wise, it's a hack. We need to have a clear definition of this as a new type of extension, maybe OutputTransformer?

And we need to add programming and sql interface, and interfaceless for it. Also update the doc.

Describe alternatives you've considered
NA

Additional context
NA

[FEATURE] Better support for class member as extensions

Is your feature request related to a problem? Please describe.
The piece of code describes the problem

import pandas as pd
from fugue_sql import FugueSQLWorkflow

class Test(object):
    # schema: *
    def t(self, df:pd.DataFrame) -> pd.DataFrame:
        return df
    
test = Test()

with FugueSQLWorkflow() as dag:
    dag.df([[0]],"a:int").transform(test.t).show() # this works

f = test.t

with FugueSQLWorkflow() as dag:
    dag("""
    CREATE [[0]] SCHEMA a:int
    TRANSFORM USING f   # this works
    PRINT
    """)
    
with FugueSQLWorkflow() as dag:
    dag("""
    CREATE [[0]] SCHEMA a:int
    TRANSFORM USING test.t   # this complains test.t is not a valid transformer
    PRINT
    """)

Describe the solution you'd like
TRANSFORM USING test.t should work, test.t should be recognized
This is not a problem of fugue, but a problem of triad
https://github.com/fugue-project/triad/blob/84d1f01be413251a5a85ce89025c982e1306ed70/triad/utils/convert.py#L155
It needs improvement in triad to be able to recognize this expression.
On Fugue side, additional tests should be added to test class member as extension scenarios

Describe alternatives you've considered
The alternative is the second example this the code above

Additional context
NA

[FEATURE] Persist to file

Is your feature request related to a problem? Please describe.
NA

Describe the solution you'd like
Currently, Fugue can only persist to memory. And with Spark engine, Fugue is calling spark's persist + count (eager in memory checkpoint).

However, this is not good enough

  • Spark persist is different from spark checkpoint https://stackoverflow.com/questions/35127720/what-is-the-difference-between-spark-checkpoint-and-persist-to-a-disk
  • Spark persist does not break the lineage, although persist+count ensures the persisted dataframe is not computed again, the lineage still exists, with long execution graph, spark will have overflow issue (even you can dedup the compute). This is a problem of spark itself, but we need to deal with it on Fugue level
  • Persist to file in spark is equivalent to checkpoint, however, I am not clear if we should use spark implementation or we should create our own. With our own implementation we can bring better consistency cross engines, and the implementation isn't very complicated.

Describe alternatives you've considered
NA

Additional context
NA

[FEATURE] Save and load to use

Is your feature request related to a problem? Please describe.
NA

Describe the solution you'd like
The idea is very similar to checkpoint, the difference is that where to save the file is specified by user. So that we can use it as a checkpoint, but we also explicitly saved the dataframe to a place we want so we can use it later on

Describe alternatives you've considered
NA

Additional context
NA

[FEATURE] Deterministic checkpoint and unification of checkpoints

Checkpoint is for caching, there should be 3 types of checkpoints

  • weak checkpoint: it should be similar to spark persist. It is good enough for most use cases, it can avoid duplicated compute of a dataframe. But it may not break up the compute dependency (this is a tricky statement), for example, in spark, if the compute graph is very large, a persist can cause stack overflow.
  • strong checkpoint: saving the dataframe to a file and load back. The benefit of doing this is to guarantee the compute graph is broken up at this point, so even with very large compute graph, every strong checkpoint is a fresh new start.
  • deterministic checkpoint: similar to strong checkpoint, but this checkpoint file is not remove after each complete execution, and the file path is deterministic, so if you run the same thing for a second time, it could start from the checkpoint, which can save a lot of time.

All of these checkpoints have lazy/eager mode (all default to eager)

  • lazy means the first usage of this dataframe triggers the caching to happen, other usage will use the cached dataframe
  • eager means the dataframe caching happens before any usage.

Eager mode is more intuitive, and most cases should use eager mode.
Currently, for strong and deterministic checkpoint, they are always eager checkpoints, so it doesn't matter if you set lazy to true or false

Notice:

  • weak checkpoint is the one to use in most cases
  • weak and strong checkpoints are for single executions, there is no memory cross execution
  • deterministic checkpoint is for cross execution scenario. Normally it should NEVER be used in production, it can be powerful for development
  • currently, deterministic checkpoint implementation is not perfect. If there are non-lazy previous steps, they will still run and take time.

[FEATURE] presort should also take array as input

Is your feature request related to a problem? Please describe.
NA

Describe the solution you'd like

def _parse_presort_exp( # noqa: C901

Currently, presort can only be string expression, but programming should also be able to use arrays

"a,b" == ["a", "b"] == [("a", True),("b", True)]
"a desc,b" == [("a", False), "b"] == [("a", False),("b", True)]

Describe alternatives you've considered
NA

Additional context
NA

[BUG] Save to parquet throws error on NativeExecutionEngine with pandas < 1.0

Minimal Code To Reproduce

from fugue import FugueWorkflow

with FugueWorkflow() as dag:
    dag.df([[0]],"a:int").save("/tmp/1.parquet")

Describe the bug
For pandas>=1, it works fine, for pandas<1 it complains the schema argument is duplicated

Expected behavior
It should work for pandas < 1

Environment (please complete the following information):

  • Backend: pandas
  • Backend version: 0.25.0
  • Python version: 3.6.9
  • OS: linux/windows linux

[BUG] There are some tests failing in Windows

Minimal Code To Reproduce

python3 -bb -m pytest tests/fugue

Describe the bug
On my system, the tests produced the following error:

FAILED tests/fugue/dataframe/test_utils.py::test_serialize_df - fs.errors.InvalidCharsInPath: path 'C:\Users\Kevin\AppData\Local\Temp\pytest-of-Kevin\pytest-5\test_serialize_df0\1.pkl' contains invalid characters
FAILED tests/fugue/execution/test_naive_execution_engine.py::NativeExecutionEngineTests::test_io - ValueError: invalid path C:\Users\Kevin\AppData\Local\Temp\pytest-of-Kevin\pytest-5\test_io0\a
FAILED tests/fugue/execution/test_naive_execution_engine.py::NativeExecutionEngineBuiltInTests::test_io - ValueError: invalid path C:\Users\Kevin\AppData\Local\Temp\pytest-of-Kevin\pytest-5\test_io1\a
FAILED tests/fugue/utils/test_io.py::test_parquet_io - ValueError: invalid path C:\Users\Kevin\AppData\Local\Temp\pytest-of-Kevin\pytest-5\test_parquet_io0\a.parquet
FAILED tests/fugue/utils/test_io.py::test_csv_io - ValueError: invalid path C:\Users\Kevin\AppData\Local\Temp\pytest-of-Kevin\pytest-5\test_csv_io0\a.csv
FAILED tests/fugue/utils/test_io.py::test_json - ValueError: invalid path C:\Users\Kevin\AppData\Local\Temp\pytest-of-Kevin\pytest-5\test_json0\a.json

Expected behavior
Tests should work despite the OS

Environment (please complete the following information):

  • Python version: 3.6.9
  • OS: Windows

[FEATURE] Specify SQLEngine inside Fugue SQL

We can change SQLEngine programming interface when running a SQL. But this is not implemented inside Fugue SQL

This should work

[SqliteEngine] SELECT * FROM some_table

[<engine> (params)] should be the expression to specify sql engine inside Fugue SQL

[FEATURE] WITH statement in FugueSQL

WITH 
    a AS (SELECT 1 AS t),
    b AS (SELECT 2 AS t)
SELECT * FROM a UNION SELECT * FROM b

WITH statement is an unsuccessful attempt for SQL to become a language, Fugue SQL has a much better solution for value assignment and reuse. However, we should try to be compatible with the standard SQL on this.

So the compromise will be, if users using WITH statement, this entire WITH and SELECT statement should directly get data from the sql engine (external data source), and can't consume any previous dataframes.

For example, this is ok, because it doesn't consume dataframes

WITH 
    a AS (SELECT 1 AS t),
    b AS (SELECT 2 AS t)
SELECT * FROM a UNION SELECT * FROM b

this is also ok, because the data source is from hive:

WITH 
    a AS (SELECT * FROM hive.table1),
    b AS (SELECT * FROM hive.table2)
SELECT * FROM a UNION SELECT * FROM b

this is also ok, because the data source is from the specified custom sql engine:

CONNECT CustomSQLEngine WITH 
    a AS (SELECT * FROM t1),
    b AS (SELECT * FROM 22)
SELECT * FROM a UNION SELECT * FROM b

this is not ok, because it tries to consume t1 and t2 generated in the previous steps

t1 = CREATE [[0]] SCHEMA a:int
t2 = CREATE [[1]] SCHEMA a:int

WITH 
    a AS (SELECT * FROM t1),
    b AS (SELECT * FROM 22)
SELECT * FROM a UNION SELECT * FROM b

[FEATURE] Fugueless transform and sql

Make transform and sql utility functions so that for simple (one step) use cases, users only need to call the function, no need to start a FugueWorkflow.

[BUG] SparkExecutionEngine can't take string as datetime inside dataframe

Minimal Code To Reproduce

from fugue_sql import FugueSQLWorkflow

with FugueSQLWorkflow(SparkExecutionEngine()) as dag:
    dag("""
    CREATE [["2020-01-01"]] SCHEMA a:datetime
    PRINT
    """)

Describe the bug

adf = ArrowDataFrame(df, to_schema(schema))

If directly using ArrowDataFrame, arrow can't do the smart conversion. We need a better way to handle this.

Expected behavior
For any execution engine, string as datetime should at least work for FugueSQL

Environment (please complete the following information):

  • Backend: pandas/dask/ray? spark
  • Backend version: 3
  • Python version: 3.6.9
  • OS: linux/windows linux

[BUG] file.latest.parquet is not recognized

Minimal Code To Reproduce

NA

Describe the bug
The problem is here

self.suffix in _FORMAT_MAP,

it takes .latest.parquet as the suffix so can't accept it

Expected behavior
it should accept it

Environment (please complete the following information):

  • Backend: pandas/dask/ray?
  • Backend version:
  • Python version: 3.6.9
  • OS: linux/windows linux

[FEATURE] DROP COLUMNS and DROP NA in Fugue SQL

Is your feature request related to a problem? Please describe.
NA

Describe the solution you'd like
Add DROP COLUMNS to FugueSQL

Describe alternatives you've considered
NA

Additional context
NA

[BUG] datetime column (pd.DataFrame) returned in Transformer is causing spark error

Minimal Code To Reproduce

import pandas as pd

# schema: a:datetime
def t(sdf:pd.DataFrame) -> pd.DataFrame:
    sdf["a"]=pd.to_datetime(sdf["a"])
    return sdf

with FugueSQLWorkflow(SparkExecutionEngine()) as dag:
    dag.df([["2020-01-01"]], "a:str").transform(t).show()

Describe the bug

TypeError: field a: TimestampType can not accept object Timestamp('2020-01-01 00:00:00') in type <class 'pandas._libs.tslibs.timestamps.Timestamp'>

Expected behavior
This should work.
Should add datetime tests into general execution engine test suites.

Environment (please complete the following information):

  • Backend: pandas/dask/ray? spark
  • Backend version: 3
  • Python version: 3.6.9
  • OS: linux/windows linux

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.