dask-contrib / dask-sql Goto Github PK
View Code? Open in Web Editor NEWDistributed SQL Engine in Python using Dask
Home Page: https://dask-sql.readthedocs.io/
License: MIT License
Distributed SQL Engine in Python using Dask
Home Page: https://dask-sql.readthedocs.io/
License: MIT License
Hi! @nils-braun,
As you already know I mistakenly opened this issue on Dask-Docker repo and you were kindly alerted by @jrbourbeau
I will copy/paste my original post here as well as your initial answer (Thank you for your quick reply)
Here is my original post:
####################################################################
What happened:
After installing Java and dask-sql using pip, whenever I try to run a SQL query from my python code I get the following error:
...
File "/home/vquery/.local/lib/python3.8/site-packages/dask_sql/context.py", line 378, in sql
rel, select_names, _ = self._get_ral(sql)
File "/home/vquery/.local/lib/python3.8/site-packages/dask_sql/context.py", line 515, in _get_ral
nonOptimizedRelNode = generator.getRelationalAlgebra(validatedSqlNode)
java.lang.java.lang.IllegalStateException: java.lang.IllegalStateException: Unable to instantiate java compiler
...
...
File "JaninoRelMetadataProvider.java", line 426, in org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile
File "CompilerFactoryFactory.java", line 61, in org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory
java.lang.java.lang.NullPointerException: java.lang.NullPointerException
What you expected to happen:
I should get a dataframe as a result.
Minimal Complete Verifiable Example:
# The cluster/client setup is done first, in another module not the one executing the SQL query
# Also tried other cluster/scheduler types with the same error
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(
n_workers=4,
threads_per_worker=1,
processes=False,
dashboard_address=':8787',
asynchronous=False,
memory_limit='1GB'
)
client = Client(cluster)
# The SQL code is executed in its own module
import dask.dataframe as dd
from dask_sql import Context
c = Context()
df = dd.read_parquet('/vQuery/files/results/US_Accidents_June20.parquet')
c.register_dask_table(df, 'df')
df = c.sql("""select ID, Source from df""") # This line fails with the error reported
Anything else we need to know?:
As mentioned in the code snippet above, due to the way my application is designed, the Dask client/cluster setup is done before dask-sql context is created.
Environment:
Install steps
$ sudo apt install default-jre
$ sudo apt install default-jdk
$ java -version
openjdk version "11.0.10" 2021-01-19
OpenJDK Runtime Environment (build 11.0.10+9-Ubuntu-0ubuntu1.20.04)
OpenJDK 64-Bit Server VM (build 11.0.10+9-Ubuntu-0ubuntu1.20.04, mixed mode, sharing)
$ javac -version
javac 11.0.10
$ echo $JAVA_HOME
/usr/lib/jvm/java-11-openjdk-amd64
$ pip install dask-sql
$ pip list | grep dask-sql
dask-sql 0.3.1
These operations need complex types (e.g. collections).
Taken from here and ff.
Value Constructors
Collection Operations
Period Operations
Implement more advanced (but required concepts):
Github allows you put a link to the documentation at the top of the page
Could put https://dask-sql.readthedocs.io/en/latest/ there
I'm taking a look at https://dask-sql.readthedocs.io/en/latest/pages/data_input.html#load-hive-data and i'm curious if a variant exists to connect to a databricks cluster (https://docs.microsoft.com/en-us/azure/databricks/integrations/bi/jdbc-odbc-bi) to extract tables/info from the Hive metastore.
https://stackoverflow.com/a/61806107/6046019 may also be relevant
This could very well be difficult to implement but the ANALYZE query would be useful to have. For reference this is what spark currently supports with respect to ANALAYZE
:
https://spark.apache.org/docs/3.0.0/sql-ref-syntax-aux-analyze-table.html
The SQL server is definitely just a hacky proof-of-concept so far.
It can be improved in multiple ways:
When I run the following code I get a TypeError:
from dask_sql import Context
import pandas as pd
df = pd.DataFrame({'name':['ifooi','ibooi','tbooi',None],
'number':[1,2,3,4]})
c = Context()
c.register_dask_table(df, 'table')
c.sql("""
SELECT * FROM "table" as s
WHERE s.name NOT LIKE '%foo%'
""")
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-186-e48fca4ecf21> in <module>
----> 1 c.sql("""
2 SELECT * FROM "table" as s
3 WHERE s.name NOT LIKE '%foo%'
4 """)
~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/context.py in sql(self, sql)
261 try:
262 rel, select_names = self._get_ral(sql)
--> 263 dc = RelConverter.convert(rel, context=self)
264 except (ValidationException, SqlParseException) as e:
265 logger.debug(f"Original exception raised by Java:\n {e}")
~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
54 f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
55 )
---> 56 df = plugin_instance.convert(rel, context=context)
57 logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
58 return df
~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rel/logical/project.py in convert(self, rel, context)
23 ) -> DataContainer:
24 # Get the input of the previous step
---> 25 (dc,) = self.assert_inputs(rel, 1, context)
26
27 df = dc.df
~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rel/base.py in assert_inputs(rel, n, context)
81 from dask_sql.physical.rel.convert import RelConverter
82
---> 83 return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
84
85 @staticmethod
~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rel/base.py in <listcomp>(.0)
81 from dask_sql.physical.rel.convert import RelConverter
82
---> 83 return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
84
85 @staticmethod
~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
54 f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
55 )
---> 56 df = plugin_instance.convert(rel, context=context)
57 logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
58 return df
~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rel/logical/filter.py in convert(self, rel, context)
26 # we just need to apply it here
27 condition = rel.getCondition()
---> 28 df_condition = RexConverter.convert(condition, dc, context=context)
29 df = df[df_condition]
30
~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rex/convert.py in convert(cls, rex, dc, context)
60 )
61
---> 62 df = plugin_instance.convert(rex, dc, context=context)
63 logger.debug(f"Processed REX {rex} into {LoggableDataFrame(df)}")
64 return df
~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rex/core/call.py in convert(self, rex, dc, context)
460 f"Executing {operator_name} on {[str(LoggableDataFrame(df)) for df in operands]}"
461 )
--> 462 return operation(*operands)
463
464 # TODO: We have information on the typing here - we should use it
~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rex/core/call.py in __call__(self, *operands)
28 def __call__(self, *operands) -> Union[dd.Series, Any]:
29 """Call the stored function"""
---> 30 return self.f(*operands)
31
32 def of(self, op: "Operation") -> "Operation":
~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rex/core/call.py in not_(self, df)
164 """
165 if is_frame(df):
--> 166 return ~df
167 else:
168 return not df # pragma: no cover
~/anaconda3/envs/py38/lib/python3.8/site-packages/pandas/core/generic.py in __invert__(self)
1471 return self
1472
-> 1473 new_data = self._data.apply(operator.invert)
1474 result = self._constructor(new_data).__finalize__(self)
1475 return result
~/anaconda3/envs/py38/lib/python3.8/site-packages/pandas/core/internals/managers.py in apply(self, f, filter, **kwargs)
438
439 if callable(f):
--> 440 applied = b.apply(f, **kwargs)
441 else:
442 applied = getattr(b, f)(**kwargs)
~/anaconda3/envs/py38/lib/python3.8/site-packages/pandas/core/internals/blocks.py in apply(self, func, **kwargs)
388 """
389 with np.errstate(all="ignore"):
--> 390 result = func(self.values, **kwargs)
391
392 if is_extension_array_dtype(result) and result.ndim > 1:
TypeError: bad operand type for unary ~: 'NoneType'
If I remove "NOT", then I get a ValueError:
from dask_sql import Context
import pandas as pd
df = pd.DataFrame({'name':['ifooi','ibooi','tbooi',None],
'number':[1,2,3,4]})
c = Context()
c.register_dask_table(df, 'table')
c.sql("""
SELECT * FROM "table" as s
WHERE s.name LIKE '%foo%'
""")
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-167-19ac24a5e2b5> in <module>
7 c.register_dask_table(df, 'table')
8
----> 9 c.sql("""
10 SELECT * FROM "table" as s
11 WHERE s.name LIKE '%foo%'
~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/context.py in sql(self, sql)
261 try:
262 rel, select_names = self._get_ral(sql)
--> 263 dc = RelConverter.convert(rel, context=self)
264 except (ValidationException, SqlParseException) as e:
265 logger.debug(f"Original exception raised by Java:\n {e}")
~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
54 f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
55 )
---> 56 df = plugin_instance.convert(rel, context=context)
57 logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
58 return df
~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rel/logical/project.py in convert(self, rel, context)
23 ) -> DataContainer:
24 # Get the input of the previous step
---> 25 (dc,) = self.assert_inputs(rel, 1, context)
26
27 df = dc.df
~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rel/base.py in assert_inputs(rel, n, context)
81 from dask_sql.physical.rel.convert import RelConverter
82
---> 83 return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
84
85 @staticmethod
~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rel/base.py in <listcomp>(.0)
81 from dask_sql.physical.rel.convert import RelConverter
82
---> 83 return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
84
85 @staticmethod
~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
54 f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
55 )
---> 56 df = plugin_instance.convert(rel, context=context)
57 logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
58 return df
~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rel/logical/filter.py in convert(self, rel, context)
27 condition = rel.getCondition()
28 df_condition = RexConverter.convert(condition, dc, context=context)
---> 29 df = df[df_condition]
30
31 cc = self.fix_column_to_row_type(cc, rel.getRowType())
~/anaconda3/envs/py38/lib/python3.8/site-packages/pandas/core/frame.py in __getitem__(self, key)
2788
2789 # Do we have a (boolean) 1d indexer?
-> 2790 if com.is_bool_indexer(key):
2791 return self._getitem_bool_array(key)
2792
~/anaconda3/envs/py38/lib/python3.8/site-packages/pandas/core/common.py in is_bool_indexer(key)
134 na_msg = "Cannot mask with non-boolean array containing NA / NaN values"
135 if isna(key).any():
--> 136 raise ValueError(na_msg)
137 return False
138 return True
ValueError: Cannot mask with non-boolean array containing NA / NaN values
Adding the line 'OR s.name IS NULL' resolves the issue only in the second example
From @timhdesilva
So I have a large dataset (50GB) that needs to be merged with a small dataset that is a Pandas dataframe. Prior to the merge, I need to perform a groupby observation on the large dataset. Using Dask, I have been able to perform the groupby observation on the large dataset (which is a Dask dataframe). When I then merge the two datasets using X.merge(Y), I have no issues. The problem is that I need to perform a merge than is not exact (i.e. one column between two others), which is why I'm turning to dask-sql. When I try to do the merge with dask-sql though, I get a memory error (the number of observations should only be ~ 10x than the exact merge, so memory shouldn't be a problem).
Any ideas here? I'm thinking somehow the issue might be that I am performing a groupby operation on the Dask dataframe prior to the dask-sql merge. Is this allowed - i.e. can one do a groupby and not execute it prior to using the dask-sql create_table() command and then performing a dask-sql merge with c.sql?
Could see how they do it in dask
When installing dask-sql via conda, also Java is installed as dependency. This sets the $JAVA_HOME
environment variable to the correct path, so that java applications will pick up the correct java installation.
However, it can (of course) only do this for newly spawned processes.
If you install dask-sql in e.g. a jupyter notebook (via "! conda install dask-sql") it won't work.
In this case, the JAVA_HOME needs to be set explicitly to $CONDA_PREFIX
.
So far, aggregation can only work without windowing (without the OVER
keyword).
Unfortunately, I do (still) not know how to implement the very broad windowing techniques from SQL in dask.
When there is a SQL validation error, the error is shown on the screen. However, we always get a nested Java and Python exception, which might lead to confusion.
The blazingSQL project created their own SQL exception in Java for this - we might also get along with just catching and parsing the Java exception in python and printing out a piece of more useful information to the user in the Context.sql
function.
Currently, dask-sql
can only handle descending sorting, as the set_index
function can not handle anything else.
This might be possible to overcome, e.g. by using a clever way of column redefinitions or by implementing a custom function (instead of set_index
).
Starting from a random timeseries table I played a bit with the where
clause and got some unexpected results:
from dask_sql import Context
import dask.datasets
df = dask.datasets.timeseries()
c = Context()
c.create_table("timeseries", df)
In [26]: c.sql("SELECT *\nFROM timeseries AS q48\nWHERE True")
Out[26]:
Dask DataFrame Structure:
id name x y
npartitions=30
2000-01-01 int64 object float64 float64
2000-01-02 ... ... ... ...
... ... ... ... ...
2000-01-30 ... ... ... ...
2000-01-31 ... ... ... ...
Dask Name: getitem, 210 tasks
However with False
, (0 = 1)
, and (1 = 1)
there are failures:
c.sql("SELECT *\nFROM timeseries AS q48\nWHERE (0 = 1)")
...
pandas/_libs/tslibs/timestamps.pyx in pandas._libs.tslibs.timestamps.Timestamp.__new__()
pandas/_libs/tslibs/conversion.pyx in pandas._libs.tslibs.conversion.convert_to_tsobject()
TypeError: Cannot convert input [False] of type <class 'numpy.bool_'> to Timestamp
The above is not totally unexpected -- a pandas dataframe cannot perform df[True]
. However, with SQL I believe WHERE Boolean
is valid. Is this reasonable to expect dask-sql
to handle cleanly while dask/pandas do not?
The RexCallPlugin
currently can only handle the most common expressions (binary operations, or and and). There are many more of them that could and should be implemented.
The LogicalAggregatePlugin
currently can not handle all aggregation functions.
From the calcite docu, here are all aggregation functions that calcite understands:
Operations on collection types:
Authorization Plugins
?I would love if this 2 project can merge their great features, to be more powerful tools ๐
Great work with this library! I ran into one small snag:
dask-sql==0.1.dev13+gdf4572c
Unable to start JVM due to --illegal-access=deny. Once I remove that option then the library seems to work fine
https://github.com/nils-braun/dask-sql/blob/main/dask_sql/java.py#L15
This might be interesting for astronomy users.
https://www.ivoa.net/documents/ADQL/20180112/PR-ADQL-2.1-20180112.html
As the server speaks the presto protocol, which is understood by most of the BI tools, it would be nice to test and showcase it with some of them.
For example I have done a very quick test with both hue and metabase, which look promising - but also have shown some additional quirks in the server implementation.
https://docs.dask.org/en/latest/dataframe-sql.html explicitly says that dask does not "implement" SQL. But now it does! This is a reminder to update that page, when this package feels ready enough to be publicised.
I wonder if this is useful: https://github.com/IntelPython/sdc
ModuleNotFoundError: No module named 'importlib.metadata'
$ python -V
Python 3.7.6
$ pip install --upgrade -r requirements.txt
Requirement already up-to-date: dask in ./.venv/lib/python3.7/site-packages (from -r requirements.txt (line 1)) (2021.1.1)
Requirement already up-to-date: dask-sql in ./.venv/lib/python3.7/site-packages (from -r requirements.txt (line 3)) (0.3.0)
Requirement already up-to-date: distributed in ./.venv/lib/python3.7/site-packages (from -r requirements.txt (line 4)) (2021.1.1)
Requirement already up-to-date: importlib-metadata in ./.venv/lib/python3.7/site-packages (from -r requirements.txt (line 5)) (3.4.0)
Requirement already satisfied, skipping upgrade: pyyaml in ./.venv/lib/python3.7/site-packages (from dask->-r requirements.txt (line 1)) (5.4.1)
Requirement already satisfied, skipping upgrade: jpype1>=1.0.2 in ./.venv/lib/python3.7/site-packages (from dask-sql->-r requirements.txt (line 3)) (1.2.1)
Requirement already satisfied, skipping upgrade: tzlocal>=2.1 in ./.venv/lib/python3.7/site-packages (from dask-sql->-r requirements.txt (line 3)) (2.1)
Requirement already satisfied, skipping upgrade: fastapi>=0.61.1 in ./.venv/lib/python3.7/site-packages (from dask-sql->-r requirements.txt (line 3)) (0.63.0)
Requirement already satisfied, skipping upgrade: pygments in ./.venv/lib/python3.7/site-packages (from dask-sql->-r requirements.txt (line 3)) (2.7.4)
Requirement already satisfied, skipping upgrade: uvicorn>=0.11.3 in ./.venv/lib/python3.7/site-packages (from dask-sql->-r requirements.txt (line 3)) (0.13.3)
Requirement already satisfied, skipping upgrade: pandas<1.2.0 in ./.venv/lib/python3.7/site-packages (from dask-sql->-r requirements.txt (line 3)) (1.1.5)
Requirement already satisfied, skipping upgrade: prompt-toolkit in ./.venv/lib/python3.7/site-packages (from dask-sql->-r requirements.txt (line 3)) (3.0.14)
Requirement already satisfied, skipping upgrade: zict>=0.1.3 in ./.venv/lib/python3.7/site-packages (from distributed->-r requirements.txt (line 4)) (2.0.0)
Requirement already satisfied, skipping upgrade: psutil>=5.0 in ./.venv/lib/python3.7/site-packages (from distributed->-r requirements.txt (line 4)) (5.8.0)
Requirement already satisfied, skipping upgrade: click>=6.6 in ./.venv/lib/python3.7/site-packages (from distributed->-r requirements.txt (line 4)) (7.1.2)
Requirement already satisfied, skipping upgrade: sortedcontainers!=2.0.0,!=2.0.1 in ./.venv/lib/python3.7/site-packages (from distributed->-r requirements.txt (line 4)) (2.3.0)
Requirement already satisfied, skipping upgrade: setuptools in ./.venv/lib/python3.7/site-packages (from distributed->-r requirements.txt (line 4)) (41.2.0)
Requirement already satisfied, skipping upgrade: tornado>=5; python_version < "3.8" in ./.venv/lib/python3.7/site-packages (from distributed->-r requirements.txt (line 4)) (6.1)
Requirement already satisfied, skipping upgrade: toolz>=0.8.2 in ./.venv/lib/python3.7/site-packages (from distributed->-r requirements.txt (line 4)) (0.11.1)
Requirement already satisfied, skipping upgrade: msgpack>=0.6.0 in ./.venv/lib/python3.7/site-packages (from distributed->-r requirements.txt (line 4)) (1.0.2)
Requirement already satisfied, skipping upgrade: cloudpickle>=1.5.0 in ./.venv/lib/python3.7/site-packages (from distributed->-r requirements.txt (line 4)) (1.6.0)
Requirement already satisfied, skipping upgrade: tblib>=1.6.0 in ./.venv/lib/python3.7/site-packages (from distributed->-r requirements.txt (line 4)) (1.7.0)
Requirement already satisfied, skipping upgrade: typing-extensions>=3.6.4; python_version < "3.8" in ./.venv/lib/python3.7/site-packages (from importlib-metadata->-r requirements.txt (line 5)) (3.7.4.3)
Requirement already satisfied, skipping upgrade: zipp>=0.5 in ./.venv/lib/python3.7/site-packages (from importlib-metadata->-r requirements.txt (line 5)) (3.4.0)
Requirement already satisfied, skipping upgrade: pytz in ./.venv/lib/python3.7/site-packages (from tzlocal>=2.1->dask-sql->-r requirements.txt (line 3)) (2020.5)
Requirement already satisfied, skipping upgrade: pydantic<2.0.0,>=1.0.0 in ./.venv/lib/python3.7/site-packages (from fastapi>=0.61.1->dask-sql->-r requirements.txt (line 3)) (1.7.3)
Requirement already satisfied, skipping upgrade: starlette==0.13.6 in ./.venv/lib/python3.7/site-packages (from fastapi>=0.61.1->dask-sql->-r requirements.txt (line 3)) (0.13.6)
Requirement already satisfied, skipping upgrade: h11>=0.8 in ./.venv/lib/python3.7/site-packages (from uvicorn>=0.11.3->dask-sql->-r requirements.txt (line 3)) (0.12.0)
Requirement already satisfied, skipping upgrade: numpy>=1.15.4 in ./.venv/lib/python3.7/site-packages (from pandas<1.2.0->dask-sql->-r requirements.txt (line 3)) (1.19.5)
Requirement already satisfied, skipping upgrade: python-dateutil>=2.7.3 in ./.venv/lib/python3.7/site-packages (from pandas<1.2.0->dask-sql->-r requirements.txt (line 3)) (2.8.1)
Requirement already satisfied, skipping upgrade: wcwidth in ./.venv/lib/python3.7/site-packages (from prompt-toolkit->dask-sql->-r requirements.txt (line 3)) (0.2.5)
Requirement already satisfied, skipping upgrade: heapdict in ./.venv/lib/python3.7/site-packages (from zict>=0.1.3->distributed->-r requirements.txt (line 4)) (1.0.1)
Requirement already satisfied, skipping upgrade: six>=1.5 in ./.venv/lib/python3.7/site-packages (from python-dateutil>=2.7.3->pandas<1.2.0->dask-sql->-r requirements.txt (line 3)) (1.15.0)
$ java --version
openjdk 14.0.2 2020-07-14
OpenJDK Runtime Environment (build 14.0.2+12-Ubuntu-120.04)
OpenJDK 64-Bit Server VM (build 14.0.2+12-Ubuntu-120.04, mixed mode, sharing)
$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description: Ubuntu 20.04.1 LTS
Release: 20.04
Codename: focal
With the most recent additions, the SQL statements that dask-sql understands have grown. It might make sense to add a page for every SQL statement instead of having them all on a single documentation page
Apache Hue has an integration for dask-sql, where the commands are autocompleted and hightlighted. Currently, this is out of sync due to the recently added new SQL features (e.g. CREATE MODEL).
Descriptions:
NotImplementedError: The python type datetime64[ns, UTC] is not implemented (yet)
from this function /dask_sql/mappings.py", line 78, in python_to_sql_type
Reproduction Step Error:
from dask_sql import Context
c = Context()
import dask.dataframe as ddf
user_df = ddf.read_sql_table(table="auth_user", schema="public", uri="postgresql://username:password@localhost:5432/dbname",index_col='id')
c.create_table("pg.public.auth_user", user_df)
c.sql("SELECT * FROM pg.public.auth_user")
Notes:
I'm using this packages:
The rules currently implemented in the RelationalAlgebraGenerator
java class are a mixture of the standard rules and the ones from the blazingSQL project.
So far, they have worked quite well. However, it might be good to check, if we have the best set of rules for optimization in place.
$ git clone https://github.com/nils-braun/dask-sql.git
$ cd dask-sql
$ pytest tests
ERROR: usage: pytest [options] [file_or_dir] [file_or_dir] [...]
pytest: error: unrecognized arguments: --cov --cov-config=.coveragerc tests
inifile: /mnt/d/Programs/dask/dask-sql/pytest.ini
rootdir: /mnt/d/Programs/dask/dask-sql
$ python setup.py java
running java
Traceback (most recent call last):
File "setup.py", line 93, in <module>
command_options={"build_sphinx": {"source_dir": ("setup.py", "docs"),}},
File "/home/saulo/anaconda3/lib/python3.7/site-packages/setuptools/__init__.py", line 165, in setup
return distutils.core.setup(**attrs)
File "/home/saulo/anaconda3/lib/python3.7/distutils/core.py", line 148, in setup
dist.run_commands()
File "/home/saulo/anaconda3/lib/python3.7/distutils/dist.py", line 966, in run_commands
self.run_command(cmd)
File "/home/saulo/anaconda3/lib/python3.7/distutils/dist.py", line 985, in run_command
cmd_obj.run()
File "setup.py", line 30, in run
self.announce(f"Running command: {' '.join(command)}", level=distutils.log.INFO)
TypeError: sequence item 0: expected str instance, NoneType found
$ python dask-sql-test.py
Traceback (most recent call last):
File "dask-sql-test.py", line 1, in <module>
from dask_sql import Context
File "/mnt/d/Programs/dask/dask-sql/dask_sql/__init__.py", line 1, in <module>
from .context import Context
File "/mnt/d/Programs/dask/dask-sql/dask_sql/context.py", line 9, in <module>
from dask_sql.java import (
File "/mnt/d/Programs/dask/dask-sql/dask_sql/java.py", line 88, in <module>
DaskTable = com.dask.sql.schema.DaskTable
AttributeError: Java package 'com' has no attribute 'dask'
$ python -V
Python 3.7.6
$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description: Ubuntu 20.04.1 LTS
Release: 20.04
Codename: focal
$ java -version
openjdk version "14.0.2" 2020-07-14
OpenJDK Runtime Environment (build 14.0.2+12-Ubuntu-120.04)
OpenJDK 64-Bit Server VM (build 14.0.2+12-Ubuntu-120.04, mixed mode, sharing)
I know they are mostly useless, but they still might add some value. Have any been done?
The RelDataType
has all the information on the types of the columns also. Currently, we rely on dask to create the correct type - but we won't get subtle differences such as int32
and int64
with that.
Therefore a type conversion for each column should be implemented (and checked) in the fix_column_to_row_type
and check_columns_from_row_type
functions.
The ML implementation is still a bit experimental - we can improve on this:
SHOW MODELS
and DESCRIBE MODEL
After a very nice discussion with @felipeblazing (thanks again for reaching out!), we decided to unify the APIs of dask-sql and blazing-sql, so that users can switch between the different packages more easily (and therefore switch between "normal" CPU dask and GPU-enabled rapids). As blazing has been around much longer already, I think it would make sense to use blazings API as the target for dask-sql.
The entrypoint to all applications with dask-sql or blazing is to instantiate a context and then call methods on it.
blazing currently has the following methods on the context:
File System:
.s3
.localfs
.hdfs
.gs
.show_filesystems
SQL:
.explain
.create_table
.drop_table
.sql
The SQL methods can be implemented quite quickly as they are basically already present (just with different names).
The FS methods are very interesting to have (see #28), so they definitely should come next.
There is another method, .log
, which however only makes sense in the blazing context and is very hard to port.
Apart from the context API, there is of course also the very important question if the same SQL queries are understood.
That should be handled next.
Currently, dask-sql
can only be installed via the source. We should find out, if uploading the packaged jar (contained in a wheel) together with the python code makes sense and if and how we can create a conda package (probably via conda-forge).
Reminder for later. Currently
SELECT * from $table WHERE col_b IS NOT NULL
results in:
try:
operation = self.OPERATION_MAPPING[operator_name]
except KeyError:
> raise NotImplementedError(f"{operator_name} not (yet) implemented")
E NotImplementedError: IS NOT NULL not (yet) implemented
Currently, all data frames need to be registered before using them in dask-sql
.
However, it could also be interesting to have dataframes directly from S3 (or any other storage, such as hdfs), from the Hive metastore or by creating temporary views. In the background, one could create dask dataframes and use the normal registration process.
For this, we first need to come up with a good SQL syntax (which is supported by calcite) and/or an API for this.
I mistakenly missed the quotes around a name in my query
import dask
import dask_sql
context = dask_sql.Context()
df = dask.datasets.timeseries()
context.register_dask_table(df, "timeseries")
context.sql("SELECT * from timeseries where name=Tim")
And I got a scary traceback
---------------------------------------------------------------------------
org.apache.calcite.sql.validate.SqlValidatorExceptionTraceback (most recent call last)
org.apache.calcite.sql.validate.SqlValidatorException: org.apache.calcite.sql.validate.SqlValidatorException: Column 'Tim' not found in any table
The above exception was the direct cause of the following exception:
org.apache.calcite.runtime.CalciteContextExceptionTraceback (most recent call last)
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in com.dask.sql.application.RelationalAlgebraGenerator.getRelationalAlgebra()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in com.dask.sql.application.RelationalAlgebraGenerator.getValidatedNode()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.prepare.PlannerImpl.validate()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SqlValidatorImpl.validate()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.SqlSelect.validate()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.AbstractNamespace.validate()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SelectNamespace.validateImpl()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereClause()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SqlValidatorImpl.expand()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.SqlCall.accept()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SqlScopedShuttle.visit()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SqlScopedShuttle.visit()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visitScoped()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.SqlOperator.acceptCall()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.SqlIdentifier.accept()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.DelegatingScope.fullyQualify()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.SqlUtil.newContextException()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.SqlUtil.newContextException()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.runtime.Resources$ExInstWithCause.ex()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in java.lang.reflect.Constructor.newInstance()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in java.lang.reflect.Constructor.newInstanceWithCaller()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0()
org.apache.calcite.runtime.CalciteContextException: org.apache.calcite.runtime.CalciteContextException: From line 1, column 37 to line 1, column 39: Column 'Tim' not found in any table
The above exception was the direct cause of the following exception:
Exception Traceback (most recent call last)
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in com.dask.sql.application.RelationalAlgebraGenerator.getRelationalAlgebra()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in com.dask.sql.application.RelationalAlgebraGenerator.getValidatedNode()
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.prepare.PlannerImpl.validate()
Exception: Java Exception
The above exception was the direct cause of the following exception:
org.apache.calcite.tools.ValidationExceptionTraceback (most recent call last)
<ipython-input-8-8095ddfbf467> in <module>
----> 1 context.sql("SELECT * from timeseries where name=Tim")
~/miniconda/lib/python3.7/site-packages/dask_sql/context.py in sql(self, sql, debug)
71 """
72 # TODO: show a nice error message if something is broken
---> 73 rel = self._get_ral(sql, debug=debug)
74 df = RelConverter.convert(rel, tables=self.tables)
75 return df
~/miniconda/lib/python3.7/site-packages/dask_sql/context.py in _get_ral(self, sql, debug)
94 generator = RelationalAlgebraGenerator(schema)
95
---> 96 rel = generator.getRelationalAlgebra(sql)
97 if debug: # pragma: no cover
98 print(generator.getRelationalAlgebraString(rel))
org.apache.calcite.tools.ValidationException: org.apache.calcite.tools.ValidationException: org.apache.calcite.runtime.CalciteContextException: From line 1, column 37 to line 1, column 39: Column 'Tim' not found in any table
I wonder if we maybe want to capture these exceptions and reraise them from a fresh, and much shorter stack.
import dask
import dask_sql
context = dask_sql.Context()
df = dask.datasets.timeseries()
context.register_dask_table(df, "timeseries")
context.sql("SELECT sum(x), sum(y) from timeseries").compute()
EXPR$0 EXPR$1
0 -1072.053022 762.750336
A user can do:
CREATE table demo as select 1 as n;
But then how to delete it?
Would be useful to have a cluster available for someone who wants to quickly try. I'll be happy to help here.
Trying to select a column from a table and getting ParsingException
.
Select * works.
>>> c.sql("""SELECT * from my_data""")
Dask DataFrame Structure:
UUID_SAILING a b
npartitions=1
object float64 float64
... ... ...
Dask Name: getitem, 6 tasks
But when trying to select a column (UUID_SAILING) I get
>>> c.sql("""SELECT UUID_SAILING from my_data""")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "C:\Users\131416\AppData\Local\Continuum\anaconda3\envs\ds-rm-obr-dev\lib\site-packages\dask_sql\context.py", line 346, in sql
rel, select_names, _ = self._get_ral(sql)
File "C:\Users\131416\AppData\Local\Continuum\anaconda3\envs\ds-rm-obr-dev\lib\site-packages\dask_sql\context.py", line 466, in _get_ral
raise ParsingException(sql, str(e.message())) from None
dask_sql.utils.ParsingException: Can not parse the given SQL: org.apache.calcite.runtime.CalciteContextException: From line 1, column 8 to line 1, column 19: Column 'uuid_sailing' not found in any table; did you mean 'UUID_SAILING'?
The problem is probably somewhere here:
SELECT UUID_SAILING from my_data
^^^^^^^^^^^^
I believe what is happening is it is expecting quotation around the field name
>>> c.sql("""SELECT * from my_data""").columns
Index(['UUID_SAILING', 'a', 'b'], dtype='object')
>>> c.sql("""SELECT 'UUID_SAILING' from my_data""")
Dask DataFrame Structure:
'UUID_SAILING'
npartitions=1
object
...
Dask Name: getitem, 5 tasks
Note: i'm on windows and the table was created using
c.create_table("my_data", remote_folder + "databricks_out_tmp/file.parquet", storage_options=storage_options)
Also this isn't reproducible using the timeseries as a dataframe
>>> from dask.datasets import timeseries
>>> df = timeseries()
>>> c.create_table("timeseries", df)
>>> c.sql("""SELECT * from timeseries""").columns
Index(['id', 'name', 'x', 'y'], dtype='object')
>>> c.sql("""SELECT * from timeseries""")
Dask DataFrame Structure:
id name x y
npartitions=30
2000-01-01 int32 object float64 float64
2000-01-02 ... ... ... ...
... ... ... ... ...
2000-01-30 ... ... ... ...
2000-01-31 ... ... ... ...
Dask Name: getitem, 210 tasks
>>> c.sql("""SELECT id from timeseries""")
Dask DataFrame Structure:
id
npartitions=30
2000-01-01 int32
2000-01-02 ...
... ...
2000-01-30 ...
2000-01-31 ...
Dask Name: getitem, 120 tasks
Because from the example in doc, right now dask-sql
only support:
Any possibility right now to create table from DB SQL
that data can query/get live data?
May be I miss read a doc.
Ty
Currently
SELECT * from $table WHERE NOT(col_b = 123.0)
results in
def convert(
self, rex: "org.apache.calcite.rex.RexNode", df: dd.DataFrame
) -> Union[dd.Series, Any]:
# Prepare the operands by turning the RexNodes into python expressions
operands = [RexConverter.convert(o, df) for o in rex.getOperands()]
# Now use the operator name in the mapping
operator_name = str(rex.getOperator().getName())
try:
> operation = self.OPERATION_MAPPING[operator_name]
E KeyError: '<>'
Thanks to @mrocklin, I have learned that using the delayed bindings introduce a performance bootleneck.
In this part, I still need them. It would be nice to remove it there.
Originally posted by @nils-braun in #75 (comment)
There are many SQL types and relevant python types in the SQL to python mappings in mappings.py
still missing.
Those can be added one-by-one.
So far, basically all standard SELECT
statements can be handled, because most of the RelNodes
and RexNodes
Apache Calcite will produce are already covered. However, there exist more of them and more advanced use cases might trigger them (leading to a NotImplementedError
so far).
I would suggest to first find use cases of SQL, where those other classes are triggered, and then use them as a test case for the implementation.
Here is the list of java classes in the Apache Calcite project under rel/logical
and if they still need to be implemented:
WITHIN GROUP
and ORDER BY
are not implemented in the groupby aggregations.
It seems the boolean conversion is broken for some inputs...
When doing from dask_sql import Context
it brings up UserWarning: You are running in a conda environment, but the JAVA_PATH is not using it. If this is by mistake, set $JAVA_HOME to $CONDA_PREFIX.
. which comes from https://github.com/nils-braun/dask-sql/blob/02e2dad28741e4da84b3a957c0eedee40e262431/dask_sql/utils.py#L31
However, on the command line I see
>>> import os
>>> os.environ["CONDA_PREFIX"]
'C:\\Users\\131416\\AppData\\Local\\Continuum\\anaconda3\\envs\\ds-rm-obr-dev'
>>> os.environ["JAVA_HOME"]
'C:\\Users\\131416\\AppData\\Local\\Continuum\\anaconda3\\envs\\ds-rm-obr-dev\\Library'
>>> from dask_sql import Context
C:\Users\131416\AppData\Local\Continuum\anaconda3\envs\ds-rm-obr-dev\lib\site-packages\dask_sql\utils.py:30: UserWarning: You are running in a conda environment, but the JAVA_PATH is not using it. If this is by mistake, set $JAVA_HOME to $CONDA_PREFIX.
warnings.warn(
Perhaps the check in https://github.com/nils-braun/dask-sql/blob/main/dask_sql/utils.py#L29 should be something like os.environ["JAVA_HOME"] != os.environ["CONDA_PREFIX"] + 'Library'
Note: i'm also on windows hence the \
Currently, many data sources can be accessed via the usual context.create_table
methods, but not so many via the CREATE TABLE
SQL call, as there is e.g. no possibility to create a sqlalchemy
cursor via SQL commands.
That should be changed, e.g. when the user passes a known sqlalchemy
connection string to the CREATE TABLE
call we can directly use it to create the table.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.