trinodb / trino Goto Github PK
View Code? Open in Web Editor NEWOfficial repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
Home Page: https://trino.io
License: Apache License 2.0
Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
Home Page: https://trino.io
License: Apache License 2.0
Per the spec (6.13 General Rules 11.c):
11) If TD is variable-length character string or large object character string, then let MLTD
be the maximum length in characters of TD.
c) If SD is fixed-length character string, variable-length character string,
or large object character string, then
ii) If the length in characters of SV is larger than LTD, then TV is the first LTD characters
of SV. If any of the remaining characters of SV are non-<space> characters,
then a completion condition is raised: warning — string data, right truncation.
This is a copy of the issue from the original prestodb repo: prestodb/presto#11869
I'm getting the error:
com.facebook.presto.metadata.OperatorNotFoundException: 'IS DISTINCT FROM' cannot be applied to ObjectId, ObjectId
when trying to select from a view that pulls data from a MongoDB connector despite 'IS DISTINCT FROM' not being used in the view. I'm assuming there's some background logic that happens with joins/aggregations that uses the 'IS DISTINCT FROM' command.
The select works in presto version .188 but does not work in version .203 when testing a presto upgrade.
According to the SQL specification, the FILTER
clause for an aggregation query should be applied to the rows in the group before the arguments to the aggregation are evaluated.
Currently, expressions in the aggregation arguments are evaluated before the filter, leading to queries failing when they shouldn't.
For example:
select
sum(1 / a) filter (where a <> 0),
sum(a) filter (where true)
from (values (1), (0)) t(a)
fails with "/ by zero" error.
This is the plan it produces:
Fragment 0 [SINGLE]
Output layout: [sum, sum_3]
Output partitioning: SINGLE []
Stage Execution Strategy: UNGROUPED_EXECUTION
Output[_col0, _col1]
│ Layout: [sum:bigint, sum_3:bigint]
│ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
│ _col0 := sum
│ _col1 := sum_3
└─ Aggregate(FINAL)
│ Layout: [sum_3:bigint, sum:bigint]
│ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
│ sum_3 := sum("sum_7")
│ sum := sum("sum_6")
└─ LocalExchange[SINGLE] ()
│ Layout: [sum_7:row(bigint, boolean, bigint, boolean), sum_6:row(bigint, boolean, bigint, boolean)]
│ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
└─ Aggregate(PARTIAL)
│ Layout: [sum_7:row(bigint, boolean, bigint, boolean), sum_6:row(bigint, boolean, bigint, boolean)]
│ sum_7 := sum("a") (mask = expr_1)
│ sum_6 := sum("expr_2") (mask = expr_0)
└─ Project[]
│ Layout: [expr_2:bigint, a:bigint, expr_1:boolean, expr_0:boolean]
│ Estimates: {rows: 2 (44B), cpu: 54, memory: 0B, network: 0B}
│ expr_2 := CAST((1 / "field") AS bigint)
│ a := CAST("field" AS bigint)
│ expr_1 := true
│ expr_0 := ("field" <> 0)
└─ LocalExchange[ROUND_ROBIN] ()
│ Layout: [field:integer]
│ Estimates: {rows: 2 (10B), cpu: 10, memory: 0B, network: 0B}
└─ Values
Layout: [field:integer]
Estimates: {rows: 2 (10B), cpu: 0, memory: 0B, network: 0B}
(1)
(0)
References:
10.9
<aggregate function>
d) If AF is immediately contained in a
<set function specification>
SFS, then a group of a grouped table of the aggregation query of SFS.
- Let T be the argument source of AF.
- Case:
a) If<filter clause>
is specified, then the<search condition>
is effectively evaluated for each row of T. Let T1 be the collection of rows of T for which the result of the<search condition>
is True.
b) Otherwise, let T1 be T.
- If
<general set function>
is specified, then:
a) Let TX be the single-column table that is the result of applying the<value expression>
to each row of T1 and eliminating null values
The TABLESAMPLE
node in the plan for the following query prevents operator fusion:
presto> explain (type distributed) select length(name) from tpch.tiny.nation TABLESAMPLE system (50);
Query Plan
---------------------------------------------------------------------------------------
Fragment 0 [SINGLE]
Output layout: [length]
Output partitioning: SINGLE []
Grouped Execution: false
- Output[_col0] => [length:bigint]
_col0 := length
- RemoteSource[1] => [length:bigint]
Fragment 1 [SOURCE]
Output layout: [length]
Output partitioning: SINGLE []
Grouped Execution: false
- Project[] => [length:bigint]
length := "length"("name")
- Sample[SYSTEM: 0.5] => [name:varchar(25)]
- TableScan[tpch:nation:sf0.01, grouped = false] => [name:varchar(25)]
Cost: {rows: 25 (302B), cpu: 302.00, memory: 0.00, network: 0.00}
name := tpch:name
Issue for effort to support:
The advantage of cross operator lazy pages is that we can avoid IO when queries are highly selective. This requires that significant processing happens in source stage, but this becomes more and more the case with improvements like CBO ("broadcast joins") or grouped execution.
Stages are:
PageProcessor
on WorkProcessor
ScanFilterAndProject
on WorkProcessor
. The pipeline would look like follows:split singleton -> [flatMap] -> pages source
-> [transform] -> page processor
-> [transform] -> merge pages
or if split is cursor based
split singleton -> [flatMap] -> cursor source -> [transform] -> merge pages
FilterAndProject
on WorkProcessor
. The pipeline would look like follows:page buffer -> [transform] -> page processor -> [transform] -> [merge pages]
WorkProcessor
pipelinesWorkProcessor
pipelinesWorkProcessors
via dedicated "gluing" operatorTopNOperator
on WorkProcessor
pipelines (fast data exploration!)See some previous discussion in the old repo: prestodb/presto#12096
test
Extracted from: prestodb/presto#9916
PrestoS3FileSystem#rename
should preserve SSE properties while copying files. Problematic line is: PrestoS3FileSystem.java:407: s3.copyObject
. According to S3 Javadoc:
* By default, all object metadata for the source object except
* <b>server-side-encryption</b>, <b>storage-class</b> and
* <b>website-redirect-location</b> are copied to the new destination
* object, unless new object metadata in the specified
* {@link CopyObjectRequest} is provided.
On the other hand, maybe PrestoS3FileSystem#rename
should throw UnsupportedOperationException
? It seems that it shouldn't be used at all because for S3 there should be no temporary objects when inserting into table (see: HiveWriteUtils#isS3FileSystem
).
The JOIN planning code in RelationPlanner currently has a bunch of complicated logic that attempts to derive and extract the terms of equality comparisons, etc. Now that JoinNode can contain an arbitrary expression as the join criteria, we should simplify this code to not do that, and leave it to the optimizers to push down predicates as necessary.
<delimited identifier> ::=
<double quote> <delimited identifier body> <double quote>
<delimited identifier body> ::= <delimited identifier part>...
<delimited identifier part> ::=
<nondoublequote character>
| <doublequote symbol>
<Unicode delimited identifier> ::=
U <ampersand> <double quote> <Unicode delimiter body> <double quote>
<Unicode escape specifier>
<Unicode escape specifier> ::=
[ UESCAPE <quote> <Unicode escape character> <quote> ]
<Unicode delimiter body> ::=
<Unicode identifier part>...
<Unicode identifier part> ::=
<delimited identifier part>
| <Unicode escape value>
24) For every <identifier body> IB there is exactly one corresponding case-normal form CNF. CNF is an <identifier body> derived from IB as follows:
Let n be the number of characters in IB. For i ranging from 1 (one) to n, the i-th character Mi of IB is transliterated into the corresponding character
or characters of CNF as follows:
Case:
a) If Mi is a lower case character or a title case character for which an equivalent upper case sequence U is de ned by Unicode, then let j be th
e number of characters in U; the next j characters of CNF are U.
b) Otherwise, the next character of CNF is Mi.
25) The case-normal form of the <identifier body> of a <regular identifier> is used for purposes such as and including determination of identifier
equivalence, representation in the Definition and Information Schemas, and representation in diagnostics areas.
...
27) Two <regular identifier>s are equivalent if the case-normal forms of their <identifier body>s, considered as the repetition of a <character string literal>
that specifies a <character set specification> of SQL_IDENTIFIER and an implementation-defined collation IDC that is sensitive to case, compare equally
according to the comparison rules in Subclause 8.2, “<comparison predicate>”.
28) A <regular identifier> and a <delimited identifier> are equivalent if the case-normal form of the <identifier body> of the <regular identifier> and the
<delimited identifier body> of the <delimited identifier> (with all occurrences of <quote> replaced by <quote symbol> and all occurrences of
<doublequote symbol> replaced by <double quote>), considered as the repetition of a <character string literal> that specifies a <character set specification>
of SQL_IDENTIFIER and IDC, compare equally according to the comparison rules in Subclause 8.2, “<comparison predicate>”.
29) Two<delimited identifier>s are equivalent if their <delimited identifierbody>s,considered as the repetition of a <character string literal> that specifies
a <character set specification> of SQL_IDENTIFIER and an implementation-defined collation that is sensitive to case, compare equally according to the
comparison rules in Subclause 8.2, “<comparison predicate>”.
30) Two <Unicode delimited identifier>s are equivalent if their <Unicode delimiter body>s, considered as the repetition of a <character string literal> that
specifies a <character set specification> of SQL_IDENTIFIER and an implementation-defined collation that is sensitive to case, compare equally according
to the comparison rules in Subclause 8.2, “<comparison predicate>”.
31) A <Unicode delimited identifier> and a <delimited identifier> are equivalent if their <Unicode delimiter body> and <delimited identifier body>,
respectively, each considered as the repetition of a <character string literal> that specifies a <character set specification> of SQL_IDENTIFIER and
an implementation-defined collation that is sensitive to case, compare equally according to the comparison rules in Subclause 8.2, “<comparison predicate>”.
32) A <regular identifier> and a <Unicode delimited identifier> are equivalent if the case-normal form of the <identifier body> of the <regular identifier>
and the <Unicode delimiter body> of the <Unicode delimited identifier> considered as the repetition of a <character string literal>, each specifying a
<character set specification> of SQL_IDENTIFIER and an implementation-defined collation that is sensitive to case, compare equally according to the
comparison rules in Subclause 8.2, “<comparison predicate>”.
The approach and design is being captured here: https://github.com/prestosql/presto/wiki/Delimited-Identifiers
This should be easy as trace token is already in ClientSession
.
Ensure EXPLAIN <query>
works for all the different setups possible, e.g.
withGeneratedName
to RecordingHiveMetastore exporterWhen JMX MBean is created via:
newExporter(binder).export(GlueHiveMetastore.class);
it doesn't list, but when created via:
newExporter(binder).export(GlueHiveMetastore.class)
.as(generator -> generator.generatedNameOf(GlueHiveMetastore.class));
it does list in presto.plugin.hive.metastore.glue
.
FYI: @electrum
create table t_si(si smallint, value varchar) with (format = 'PARQUET');
insert into t_si VALUES (SMALLINT '1', 'a');
select * from t_si where si = 1;
IllegalArgumentException: Mismatched Domain types: smallint vs integer
Queries of this shape consume exponential amounts of memory and CPU:
WITH
t1 (v) AS (VALUES 1),
t2 AS( select if(v = 0, v, v) v from t1 ),
t3 AS( select if(v = 0, v, v) v from t2 ),
t4 AS( select if(v = 0, v, v) v from t3 ),
t5 AS( select if(v = 0, v, v) v from t4 ),
t6 AS( select if(v = 0, v, v) v from t5 ),
t7 AS( select if(v = 0, v, v) v from t6 ),
t8 AS( select if(v = 0, v, v) v from t7 ),
t9 AS( select if(v = 0, v, v) v from t8 ),
t10 AS( select if(v = 0, v, v) v from t9 ),
t11 AS( select if(v = 0, v, v) v from t10 ),
t12 AS( select if(v = 0, v, v) v from t11 ),
t13 AS( select if(v = 0, v, v) v from t12 ),
t14 AS( select if(v = 0, v, v) v from t13 ),
t15 AS( select if(v = 0, v, v) v from t14 ),
t16 AS( select if(v = 0, v, v) v from t15 )
select *
from t16
where v = 0
One possible short-term fix is to adjust the inlining heuristics to only do it if the expressions are trivial or appear only once (similar to how the InlineProjections rule works)
The longer-term fix is to move PredicatePushdown to iterative optimizer rules that simply avoid unproductive pushdown actions like the one caused by this query.
Presto supports reading ZSTD ORC data.
hive.compression-codec
)Currently, Trino evaluates named queries (WITH clause) by inlining them wherever they are referenced in the body of the main query. This can cause problems if the query is non-deterministic, since every inlined instance might produce different results.
According to the spec:
1) If a non-recursive <with clause> is specified, then:
...
b) For every <with list element> WLE, let WQN be the <query name> immediately contained in WLE. Let WQE be
the <query expression> simply contained in WLE. Let WLT be the table resulting from evaluation of WQE, with
each column name replaced by the corresponding element of the <with column list>, if any, immediately contained in WLE.
c) Every <table reference> contained in <query expression> that specifies WQN identifies WLT.
This says that each named query should be evaluated once and the result "stored" in a table WLT
. Thereafter, any references to the named query directly identify that WLT
table.
WLT
in the definition above is what the spec calls "transient table":
4.15.4 Transient tables
A transient table is a named table that may come into existence implicitly during the evaluation of a<query expression>
or the execution of a trigger. A transient table is identified by a<query name>
if it arises during the evaluation of a<query expression>
, or by a<transition table name>
if it arises during the execution of a trigger. Such tables exist only for the duration of the executing SQL-statement containing the<query expression>
or for the duration of the executing trigger.
Presto is failing to read the parquet partitions if the decimal datatype don't match with what is in the hive metastore. Here is the error:
Query 20190130_224317_00018_w9d29 failed: There is a mismatch between the table and partition schemas. The types are incompatible and cannot be coerced. The column 'sbnum' in table 'default.presto_test' is declared as type 'decimal(8,0)', but partition 'month=201812' declared column 'sbnum' as type 'decimal(6,0)'.
com.facebook.presto.spi.PrestoException: There is a mismatch between the table and partition schemas. The types are incompatible and cannot be coerced. The column 'sbnum' in table 'default.presto_test' is declared as type 'decimal(8,0)', but partition 'month=201812' declared column 'sbnum' as type 'decimal(6,0)'.
at com.facebook.presto.hive.HiveSplitManager.lambda$getPartitionMetadata$2(HiveSplitManager.java:315)
at com.google.common.collect.Iterators$6.transform(Iterators.java:788)
at com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
at com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
at com.google.common.collect.Iterators$ConcatenatedIterator.hasNext(Iterators.java:1340)
at com.facebook.presto.hive.ConcurrentLazyQueue.poll(ConcurrentLazyQueue.java:37)
at com.facebook.presto.hive.BackgroundHiveSplitLoader.loadSplits(BackgroundHiveSplitLoader.java:252)
at com.facebook.presto.hive.BackgroundHiveSplitLoader.access$300(BackgroundHiveSplitLoader.java:91)
at com.facebook.presto.hive.BackgroundHiveSplitLoader$HiveSplitLoaderTask.process(BackgroundHiveSplitLoader.java:185)
at com.facebook.presto.hive.util.ResumableTasks.safeProcessTask(ResumableTasks.java:47)
at com.facebook.presto.hive.util.ResumableTasks.access$000(ResumableTasks.java:20)
at com.facebook.presto.hive.util.ResumableTasks$1.run(ResumableTasks.java:35)
at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:78)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Here are the steps to reproduce this issue -
CREATE external TABLE `presto_test`(
`sbnum` decimal(6,0),
`abnum` decimal(8,0)
)
partitioned by (
month int
)
STORED AS PARQUET
LOCATION
'/hive/presto_test'
;
2.Insert data into this table, create few partitions -
insert overwrite table presto_test partition (month=201801)
select sbnum, abnum from <tabname> limit 10
;
insert overwrite table presto_test partition (month=201802)
select sbnum, abnum from <tabname> limit 10
;
3.Access the table from Presto to ensure it works -
select count(1) from presto_test ;
4.alter the table, change the data type -
alter table presto_test change sbnum sbnum decimal (8,0) ;
5.Access the table again from presto -
select count(1) from presto_test ; (should throw an error)
6.Insert 1 more partition -
insert overwrite table presto_test partition (month=201803)
select sbnum, abnum from <tabname> limit 10
;
7.Access the table from presto again -
select count(1) from presto_test where month = 201802 ; (should work)
CURRENT_PATH
, SET PATH
4.34 SQL-paths
An SQL-path is a list of one or more
<schema name>
s that determines the search order for one of the following:
— The subject routine of a<routine invocation>
whose<routine name>
does not contain a .
— The user-defined type when the<path-resolved user-defined type name>
does not contain a<schema name>
.
4.43.3 SQL-session properties
An SQL-session has an SQL-path that is used to effectively qualify unqualified
<routine name>
s [...]
The SQL-path is initially set to an implementation-de ned value, but can subsequently be changed by the successful execution of a<set path statement>
.
The text defining the SQL-path can be referenced by using the<general value specification> CURRENT_PATH
.
6.4
<value specification>
and<target specification>
- The declared type of
CURRENT_USER
,CURRENT_ROLE
,SESSION_USER
,SYSTEM_USER
,CURRENT_CATALOG
,CURRENT_SCHEMA
, andCURRENT_PATH
is character string. Whether the character string is fixed-length or variable-length, and its length if it is fixed-length or maximum length if it is variable-length, are implementation-de ned. The character set of the character string isSQL_IDENTIFIER
. The declared type collation is the character set collation ofSQL_IDENTIFIER
, and the collation derivation is implicit.- The value specified by
CURRENT_PATH
is a<schema name list>
where<catalog name>
s are<delimited identifier>
s and the<unqualified schema name>
s are<delimited identifier>
s. Each<schema name>
is separated from the preceding<schema name>
by a<comma>
with no intervening<space>
s. The schemas referenced in this<schema name list>
are those referenced in the SQL-path of the current SQL-session context, in the order in which they appear in that SQL-path.
10.3
<path specification>
Function
Specify an order for searching for an SQL-invoked routine.
Format
<path specification> ::= PATH <schema name list> <schema name list> ::= <schema name> [ { <comma> <schema name> }... ] <schema name> ::= [ <catalog name> <period> ] <unqualified schema name> <unqualified schema name> ::= <identifier> <catalog name> ::= <identifier>
Syntax Rules
- No two
<schema name>
s contained in<schema name list>
shall be equivalent.
19.8
<set path statement>
Function
Set the SQL-path used to determine the subject routine of
<routine invocation>
s with unqualified<routine name>
s in<preparable statement>
s that are prepared in the current SQL-session by an<execute immediate statement>
or a<prepare statement>
and in<direct SQL statement>
s that are invoked directly. The SQL-path remains the current SQL-path of the SQL-session until another SQL-path is successfully set.Format
<set path statement> ::= SET <SQL-path characteristic> <SQL-path characteristic> ::= PATH <value specification>
Syntax Rules
- The declared type of the shall be a character string type.
Access Rules
None.
General Rules
- Let
S
be<value specification>
and letV
be the character string that is the value ofTRIM ( BOTH ' ' FROM S )
a) IfV
does not conform to the Format and Syntax Rules of a<schema name list>
, then an exception condition is raised: invalid schema name list specification.
b) The SQL-path of the current SQL-session is set toV
.
NOTE 724 — A<set path statement>
that is executed between a<prepare statement>
and an<execute statement>
has no effect on the prepared statement.Conformance Rules
- Without Feature S071, “SQL paths in function and type name resolution”, Conforming SQL language shall not contain a
<set path statement>
.
else
@martint, @electrum, @dain Can you share your experience with a stress testing. What do you think needs to be tested:
Are there any companies that would participate in this to get the "real" data and queries, or even hardware?
I was thinking about spinning a cluster(s) on AWS and the run the above frequently (like once a week). It would be good do it from the beginning, because now we have no many such tests and hence it should be easily automated. Then we could iteratively improve it.
In terms of syntax, this is what the spec says:
A <query expression> can also optionally contain a <result offset clause>,
which may limit the cardinality of the derived table by removing a specified
number of rows from the beginning of the derived table. If a <query expression>
contains both an <order by clause> and a <result offset clause>, then the rows
in the derived table are first sorted according to the <order by clause> and then
limited by dropping the number of rows specified in the <result offset clause>
from the beginning of the result produced by the <query expression>. If the
cardinality of the result of an evaluation of a <query expression> is less than
the offset value specified by a <result offset clause>, then the derived table is empty.
And
<query expression> ::=
[ <with clause> ] <query expression body>
[ <order by clause> ] [ <result offset clause> ] [ <fetch first clause> ]
<result offset clause> ::=
OFFSET <offset row count> { ROW | ROWS }
<fetch first clause> ::=
FETCH { FIRST | NEXT } [ <fetch first quantity> ] { ROW | ROWS } { ONLY | WITH TIES }
<fetch first quantity> ::=
<fetch first row count>
| <fetch first percentage>
<offset row count> ::=
<simple value specification>
<fetch first row count> ::=
<simple value specification>
<fetch first percentage> ::=
<simple value specification> PERCENT
The concat function allows the following forms:
concat(array(T), array(T))
concat(T, array(T))
concat(array(T), T)
This causes problems when one of the arguments is array[]
.
If T
is array(U)
in the second form, the invocation looks like concat(array(U), array(array(U))
. A call like concat(array[1,2], array[])
is ambiguous, since array[]
is a valid value for array(T)
and array(array(U))
. A similar issue happens for the third form.
For example:
ARRAY[1,2,3] || ARRAY[]
satisfies both:
array(e) || array(e)
, where e
is integer
e || array(e)
, where e
is array(integer)
We originally added it for convenience, but, in hindsight, it turns out to have been a mistake. We should leave only the variants that accept two or more arrays and introduce dedicated functions for single-element append/prepend.
ROUND(SMALLINT '99', -1)
should return 100, now it returns 99.
Method: com.facebook.presto.hive.s3.PrestoS3FileSystem#create
will check for file existence when overwrite==false
. File existence check will break read-after-write strict S3 consistency per S3 doc:
Amazon S3 provides read-after-write consistency for PUTS of new objects in
your S3 bucket in all regions with one caveat. The caveat is that if you make a
HEAD or GET request to the key name (to find if the object exists) before
creating the object, Amazon S3 provides eventual consistency for read-after-write.
The existence check itself is weak as the file might be created while data is gathered in staging directory for upload.
I propose to remove the check of use LIST
S3 request instead that does not break read-after-write
consistency.
FYI: @findepi
From the spec:
<array value constructor> ::=
<array value constructor by enumeration>
| <array value constructor by query>
<array value constructor by query> ::=
ARRAY <table subquery>
- If
<array value constructor by query>
is specified, then
a) The<query expression>
QE simply contained in the<table subquery>
shall be of degree 1 (one). Let
ET
be the declared type of the column in the result of<query expression>
.
b) The declared type of the<array value constructor by query>
is array with element typeET
and maximum
cardinality equal to the implementation-defined maximum cardinality IMDC for such array types.
- The result of
<array value constructor by query>
is determined as follows:
a)QE
is evaluated, producing a tableT
. LetN
be the number of rows inT
.
b) IfN
is greater thanIMDC
, then an exception condition is raised: data exception — array data, right truncation.
c) The result of<array value constructor by query>
is an array ofN
elements such that for all i, 1 (one) ≤i
≤N
, the value of thei-th
element is the value of the only column in thei-th
row ofT
.
NOTE 167 — The ordering of the array elements is effectively determined by the General Rules of Subclause 7.13, “<query expression>
”.
In simple terms,
SELECT ARRAY (SELECT x FROM t)
is equivalent to
SELECT (SELECT array_agg(x) FROM t)
See below.
current_date
, not 1970-01-01 (see prestodb/presto#10103 (comment) & following)TIME WITH TIME ZONE
(= #191, more details there)TIMESTAMP
should be matched to Presto TIMESTAMP WITH TIME ZONE
(??)
from_unixtime
, to_unixtime
continue to work with TIMESTAMP
?catch Exception
for checking if input is parsable in DateTimeUtils
(Note: since GH doesn't send updates for edits, when adding a bullet please be sure sure to add a comment too.)
It seems like meaning of TIMESTAMP
and TIMESTAMP WITH TIMEZONE
datatypes in Presto is totally not what is specified by SQL standard (and what other databases do).
This is my understanding of SQL 2003 standard (4.6.2 Datetimes):
TIMESTAMP WITH TIMEZONE
represents absolute point in time. Typically databases store it internally as seconds since epoch in some fixed timezone (usually UTC). When querying TIMESTAMP WITH TIMEZONE
data the values are presented to user in session timezone (yet session timezone is used just for presentation purposes).
TIMESTAMP
does not represent specific point in time, but rather a reading of a wall clock+calendar. Selecting values from TIMESTAMP
column should return same result set no matter what is the client's session timezone.
While Presto semantics is different:
TIMESTAMP
seems to do what TIMESTAMP WITH TIMEZONE
should.
TIMESTAMP WITH TIMEZONE
encodes explicit timezone information to each value stored in table. The sql standard does not define a type like that. But it does not seem very practical. Assuming that values selected from TIMESTAMP WITH TIMEZONE
are presented to user in session timezone anyway, the per-row timezone information can be stripped away and all values can be stored in some arbitrary fixed timezone (e.g. UTC).
Please comment on the semantics. It seems wrong. Why the choice - as it is hard to believe that it was not done intentionally.
@losipiuk I agree with you that Timestamp w/o TZ in Presto is broken. I do NOT agree that Timestamp w TZ should behave like Instant. I believe it should also have an associated time zone. (In other word, I believe Timestamp w TZ is implemented correctly today.) Below is an excerpt of something I wrote early last year that summarizes the current behavior and my understanding.
To summarize how things work today:
The way I understand it
Here is the reason I believe the first understanding is inconsistent. I can only think of one possible interpretation for the other 3 concepts:
Note here the inconsistency between interpretation of Timestamp w/o TZ and Time w/o TZ if we adopt the first interpretation of Timestamp w/o TZ (Instant vs LocalTime). Whereas under the second interpretation, it will be consistent (LocalDateTime vs LocalTime).
I went to SQL spec for the definitive answer:
I believe these two rules proves that SQL spec agrees with my interpretation. Let's consider cast from Timestamp w/o TZ to Timestamp w/ TZ
Under first interpretation, these two cast should have yield results that are equal. Under second interpretation, they would produce different result. The rule in SQL spec produces two different results.
Lastly, a side note from me. Both interpretation can produce results that is dependent on user session time zone:
Under the SQL spec, cast from timestamp w/o TZ to timestamp w/ TZ can produce different results based on user time zone. As a result, I guess this cast probably should NOT have been implicit.
@losipiuk, @dain, and I reached agreement:
Timestamp with Timezone
in Presto is implemented properly today (like DateTime
in joda, ZonedDateTime
in jdk8).Timestamp
in Presto is like Instant
in joda/jdk8 today. It should be like LocalDateTime
in joda/jdk8.2016-01-01 12:00:00 <TZ>
should return 12 no matter what <TZ>
is put in template.Timestamp
in Presto, we should remove implicit coercion from Timestamp
to Timestamp with Timezone
because the result value is environment dependent.Ticket migrated from prestodb/presto#7122, prestodb/presto#10326
presto> use memory.default;
USE
presto:default> explain create table x as select 1 c;
Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------
- Output[rows] => [rows:bigint]
- TableCommit[memory:INSTANCE:MemoryOutputTableHandle{table=MemoryTableHandle{connectorId=memory, schemaName=default, tableName=x, tableId=0, columnHandles=...
- LocalExchange[SINGLE] () => partialrows:bigint, fragment:varbinary
- RemoteExchange[GATHER] => partialrows:bigint, fragment:varbinary
- TableWriter => [partialrows:bigint, fragment:varbinary]
c := expr
..................
[...]
presto:default> explain create table x as select 1 c;
Query 20180205_101139_00002_ijv9f failed: line 1:9: Destination table 'memory.default.x' already exists
elect * from x;
c
---
(0 rows)
EXPLAIN
renders query plan without actually running the query, so EXPLAIN CREATE TABLE ... AS SELECT ...
should do just that.
Currently, it creates the table (but does't populate it).
It can help for computing or filtering the raw partition names that are provided by other Hive-related tools. E.g., ds=2017-05-23/type=abc/ts=2017-05-23+21%3A45%3A99
.
It should support pushdown for partition pruning.
Mysql information_schema.columns
table exposes column_comment
column, while Presto exposes comment
column, see https://dev.mysql.com/doc/refman/5.7/en/columns-table.html
I couldn't find this column in ANSI SQL, nor in Oracle, Teradata, MSSQL.
It looks like Hue depends on this column: https://github.com/cloudera/hue/blob/master/desktop/libs/notebook/src/notebook/connectors/jdbc.py#L213
This column was added to Presto about 5 years ago (with prestodb/presto#1214). In origin PR I do not see any discussion about compliance with existing database system or ANSI SQL.
It should be possible to write queries like:
SELECT *
FROM t LEFT JOIN LATERAL (...) u ON ...
SELECT *
FROM t LEFT JOIN UNNEST(t.a) u ON ...
This is an umbrella issue to track various projects related to allowing connectors to participate in query optimization. The long-term vision is for plugins to provide custom rules that can transform subplans into plugin-specific operations. This requires a set of steps:
In the short term, we can introduce special-purpose mechanisms to the SPI and engine enable the following behaviors:
Document describing the high-level approach: https://github.com/prestosql/presto/wiki/Pushdown-of-complex-operations
Plan:
make at_timezone
it public (see @dain 's prestodb/presto#5162 (comment)) and add documentation
This is required to perform AT TIME ZONE conversion when zone is not a constant
See #135 (comment) comment below for what should be implemented in this issue.
Support Roles, Grants, etc.
The system.create_empty_partition
procedure fails because the source directory does not exist (no files were written) and thus the rename fails. SemiTransactionalHiveMetastore.Committer#prepareAddPartition
should probably unconditionally create currentPath
before checking if it is equal to targetPath
.
Optimize queries of the following shape:
SELECT *
FROM (
SELECT k, agg1, agg2
FROM t
GROUP BY k
) a
JOIN (
SELECT k, agg3, agg4
FROM t
GROUP BY k
) b
ON (a.k = b.k)
as
SELECT k, agg1, agg2, agg3, agg4
FROM T
GROUP BY k
More generally, if we know k
contains distinct values, optimize
SELECT *
FROM (
SELECT k, v1, v2
FROM t
) a
JOIN (
SELECT k, v3, v4
FROM t
) b
ON (a.k = b.k)
as
SELECT k, v1, v2, v3, v4
FROM t
Incorporated from: prestodb/presto#11959 (discussion there)
tpch_sf100. orders (partitioned):
presto> show stats for hive.tpch_sf100_orc_part.orders;
column_name | data_size | distinct_values_count | nulls_fraction | row_count | low_value | high_value
---------------+---------------------+-----------------------+----------------+----------------+------------+------------
orderkey | NULL | 62881.0 | 0.0 | NULL | 162 | 599999942
custkey | NULL | 62881.0 | 0.0 | NULL | 2 | 14999998
orderstatus | 1.4999663244E8 | 3.0 | 0.0 | NULL | NULL | NULL
totalprice | NULL | 62881.0 | 0.0 | NULL | 851.17 | 571417.48
orderpriority | 1.25999070546E9 | 5.0 | 0.0 | NULL | NULL | NULL
clerk | 2.2499494866E9 | 49276.0 | 0.0 | NULL | NULL | NULL
shippriority | NULL | 1.0 | 0.0 | NULL | 0 | 0
comment | 7.275173681759999E9 | 62881.0 | 0.0 | NULL | NULL | NULL
orderdate | NULL | 2406.0 | 0.0 | NULL | 1992-01-01 | 1998-08-02
NULL | NULL | NULL | NULL | 1.4999663244E8 | NULL | NULL
(10 rows)
presto> show stats for hive.tpch_sf100_orc.orders;
column_name | data_size | distinct_values_count | nulls_fraction | row_count | low_value | high_value
---------------+----------------------+-----------------------+----------------+-----------+------------+------------
orderkey | NULL | 1.5E8 | 0.0 | NULL | 1 | 600000000
custkey | NULL | 1.0165457E7 | 0.0 | NULL | 1 | 14999999
orderstatus | 1.5E8 | 3.0 | 0.0 | NULL | NULL | NULL
totalprice | NULL | 3.4038282E7 | 0.0 | NULL | 811.73 | 591036.15
orderdate | NULL | 2449.0 | 0.0 | NULL | 1992-01-01 | 1998-08-02
orderpriority | 1.2600248040000002E9 | 5.0 | 0.0 | NULL | NULL | NULL
clerk | 2.25E9 | 100816.0 | 0.0 | NULL | NULL | NULL
shippriority | NULL | 1.0 | 0.0 | NULL | 0 | 0
comment | 7.275038709E9 | 1.12463191E8 | 0.0 | NULL | NULL | NULL
NULL | NULL | NULL | NULL | 1.5E8 | NULL | NULL
Should we add extrapolate NDVs instead? It seems that partitions might often be different chunks of data so that NDVs don't overlap.
Alternatively we could store HLL state per column as an auxiliary partition property and calculate extrapolation based on merged HLLs
Greetings,
Following #18, we have created a simple partial aggregation push-down prototype, by a creating a new PlanOptimizer. It was added at the end of the optimizers' list (here), matching the partial AggregationNode - and passing down the aggregations (specified by their FunctionCall quailified name and the relevant column handles) into ConnectorTableLayoutHandle (as it's currently done for predicate pushdown by the PickLayouts optimizer) of the underlying ScanNode.
Currently, we keep the original partial aggregation node in the query plan, since we don't support all aggregations in our connector and would like to fall back to Presto implementation for unsupported aggregations.
In order to keep the implementation generic, we have added a new API for Metadata interface for optionally adding aggregation into existing TableLayoutHandle by the new optimizer.
We would like to ask whether our approach makes sense?
If so, we would like to help the effort adding this feature (partial aggregation pushdown API) to mainline Presto.
Also, we were not sure whether we are adding the new optimizer at the correct stage of PlanOptimizers. Maybe we should put it as a part of PickLayouts?
4.10.3 Multisets
A multiset is an unordered collection. Since a multiset is unordered, there is no ordinal position to reference individual elements of a multiset.
A multiset type is a
<collection type>
. If MT is some multiset type with element type EDT, then every value of MT is a multiset of EDT.Let M1 and M2 be multisets of EDT. M1 and M2 are identical if and only if M1 and M2 have the same cardinality n, and for each element x in M1, the number of elements of M1 that are identical to x, including x itself, equals the number of elements of M2 that are identical to x.
Let n1 be the cardinality of M1 and let n2 be the cardinality of M2. M1 is a submultiset of M2 if, for each element x of M1, the number of elements of M1 that are not distinct from x, including x itself, is less than or equal to the number of elements of M2 that are not distinct from x.
Queries of the following shape:
SELECT
a1, ..., an,
agg1(...),
agg2(...)
FROM t
WHERE f1(...)
UNION ALL
SELECT
b1, ..., bn,
agg3(...),
agg4(...)
FROM t
WHERE f2(...)
...
(a1, ..., an, b1, ..., bn are constants)
can be optimized to process all the data in a single pass to reduce the number of stages in the query execution.
One possible rewrite could be:
SELECT *
FROM UNNEST( (
SELECT ARRAY[
ROW(a1, ..., an, agg1(...) FILTER (WHERE f1(...)), agg2(...) FILTER (WHERE f1(...)),
ROW(b1, ..., bn, agg3(...) FILTER (WHERE f2(...)), agg3(...) FILTER (WHERE f2(...)),
...]
FROM t) ) u
fact_table
- large fact table, partitioned on date_key
d_date
- small dimension table, with significant correlation on it's columns and date_key
.
Currently for the following query:
SELECT
count(*)
FROM
fact_table a
JOIN d_date b
ON a.date_key = b.date_key
WHERE
b.year = 2017;
larger probe table is fully scanned. When presto after scaning build table (right) could extract information that for b.year = 2017
there are only small number of matching b.date_key
values. This information could be used to narrow down the table scan. In other words, above query could be dynamically "rewritten" to equivalent of:
SELECT
count(*)
FROM
fact_table a
JOIN d_date b
ON a.date_key = b.date_key
WHERE a.date_key IN (20170101,20170102, etc...)
Design doc: https://docs.google.com/document/d/1TOlxS8ZAXSIHR5ftHbPsgUkuUA-ky-odwmPdZARJrUQ/edit
Add an optimization rule to remove unnecessary DISTINCT in this scenario:
SELECT ...
FROM t
WHERE c IN (SELECT DISTINCT ... FROM u)
Since the semi join already performs deduplication of the values in the subquery, that operation is unnecessary and results in an extra exchange and aggregation.
presto> explain (type distributed) select count(*) from tpch.tiny.orders where custkey in (select custkey from tpch.tiny.customer);
Query Plan
-----------------------------------------------------------------------------------------------------------------------------------
Fragment 0 [SINGLE]
Output layout: [count]
Output partitioning: SINGLE []
- Output[_col0] => [count:bigint]
_col0 := count
- Aggregate(FINAL) => [count:bigint]
count := "count"("count_13")
- LocalExchange[SINGLE] () => count_13:bigint
- RemoteSource[1] => [count_13:bigint]
Fragment 1 [HASH]
Output layout: [count_13]
Output partitioning: SINGLE []
- Aggregate(PARTIAL) => [count_13:bigint]
count_13 := "count"(*)
- FilterProject[filterPredicate = "expr_8"] => []
- SemiJoin[custkey = custkey_1] => [custkey:bigint, expr_8:boolean]
- RemoteSource[2] => [custkey:bigint]
- LocalExchange[SINGLE] () => custkey_1:bigint
- RemoteSource[3] => [custkey_1:bigint]
Fragment 2 [tpch:orders:15000]
Output layout: [custkey]
Output partitioning: HASH [custkey]
- TableScan[tpch:tpch:orders:sf0.01, originalConstraint = true] => [custkey:bigint]
custkey := tpch:custkey
Fragment 3 [SOURCE]
Output layout: [custkey_1]
Output partitioning: HASH (replicate nulls) [custkey_1]
- TableScan[tpch:tpch:customer:sf0.01, originalConstraint = true] => [custkey_1:bigint]
custkey_1 := tpch:custkey
presto> explain (type distributed) select count(*) from tpch.tiny.orders where custkey in (select distinct custkey from tpch.tiny.customer);
Query Plan
-------------------------------------------------------------------------------------------------
Fragment 0 [SINGLE]
Output layout: [count]
Output partitioning: SINGLE []
- Output[_col0] => [count:bigint]
_col0 := count
- Aggregate(FINAL) => [count:bigint]
count := "count"("count_13")
- LocalExchange[SINGLE] () => count_13:bigint
- RemoteSource[1] => [count_13:bigint]
Fragment 1 [HASH]
Output layout: [count_13]
Output partitioning: SINGLE []
- Aggregate(PARTIAL) => [count_13:bigint]
count_13 := "count"(*)
- FilterProject[filterPredicate = "expr_8"] => []
- SemiJoin[custkey = custkey_1] => [custkey:bigint, expr_8:boolean]
- RemoteSource[2] => [custkey:bigint]
- LocalExchange[SINGLE] () => custkey_1:bigint
- RemoteSource[3] => [custkey_1:bigint]
Fragment 2 [tpch:orders:15000]
Output layout: [custkey]
Output partitioning: HASH [custkey]
- TableScan[tpch:tpch:orders:sf0.01, originalConstraint = true] => [custkey:bigint]
custkey := tpch:custkey
Fragment 3 [HASH]
Output layout: [custkey_1]
Output partitioning: HASH (replicate nulls) [custkey_1]
- Aggregate(FINAL)[custkey_1] => [custkey_1:bigint]
- LocalExchange[HASH] ("custkey_1") => custkey_1:bigint
- RemoteSource[4] => [custkey_1:bigint]
Fragment 4 [SOURCE]
Output layout: [custkey_1]
Output partitioning: HASH [custkey_1]
- Aggregate(PARTIAL)[custkey_1] => [custkey_1:bigint]
- TableScan[tpch:tpch:customer:sf0.01, originalConstraint = true] => [custkey_1:bigint]
custkey_1 := tpch:custkey
Currently, the list of concrete grouping sets is expanded during analysis. This results in inefficiencies during analysis and planning due to redundant re-analysis and storage of the atoms of each grouping element (it allocates 2^#columns sets of expressions when analyzing the query. It then analyzes each of those expressions and creates a GroupID node with that many entries).
Instead, the engine should only analyze the atomic units (columns of each grouping element) and should encode the expected grouping sets in the plan using an abstract description of the components instead of enumerating every combination of grouping columns. The enumeration should happen when the operator is instantiated.
I.e., the GroupId node should contain a list of grouping elements:
Elements = (CUBE set<symbol> | ROLLUP list<symbol> | SETS list<set<symbol>>)+
May be all the Hive users want this features . It take huge cost changes for rewrite Hive scripts. Hive‘s market is so huge(ETL). Why not consider fully supporting Hive? This is more conducive to the development of Presto. Presto can increase the indication that opening this feature will reduce performance.
Some ln: prestodb/presto#10888
Extracted from: prestodb/presto#10305 (see discussion there)
Change partial aggregation to detect ineffective partial aggregations and switch to a mode where the raw input is output instead of intermediate results. In addition, the partial should be able to make this decision on a per-group basis so groups with a high row count can still get the benefits of partial aggregations.
For distinct aggregations in groups that performing partial aggregation, the operator would determine if a row is distinct for any of the aggregates, and if so, the raw input is output, along with flag columns that mark which aggregates the row is distinct for.
The net result is the aggregate may output rows for three distinct cases:
The output descriptor for a partial aggregation would look something like this:
row_type
either intermediate, raw, distinct partialnull
if row type is not intermediatenull
if row_type
is intermediate; for distinct row_type
only inputs to distinct aggregations may be present.null
if row_type
is not distinctIncorporated from: prestodb/presto#11615
Currently we don't have any quantifiable means of measuring stats cost model quality. It would be great to have one so that we can compare stats/cost calculator changes via A/B testing or by aggregating historical data.
Cumulative cost quality
Let's call cumulative estimated query q
cost as:
EstimatedUserCpu(q), EstimatedMemory(q), EstimatedNetwork(q)
.
Let's call actual query cost as:
CostCpu(q), CostMemory(q), CostNetwork(q)
.
All of those are random variables with respect to q
.
Observations:
f(EstimatedXXX(q)) = CostXXX(q) + E(q)
, where E(q)
is the estimate error.f
should be a linear transformation (otherwise sum of subplan estimates wont be the estimate of a whole plan)E(q)
should be proportional to the CostXXX(q)
or EstimatedXXX(q)
. This is because we expect larger errors for bigger cost/estimatesLet's say we gather queries qi
and their estimates EstimatedXXX(qi)
and costs CostXXX(qi)
. Thus we have data points composed of pairs:
<x1=EstimatedXXX(q1), y1=CostXXX(q1)>, ..., <xN=EstimatedXXX(qN), yN=CostXXX(qN)>
for i = 1...N
.
Those points will look similar to:
Let's transform those points by dividing yN
by xN
. We now get:
<x1=EstimatedXXX(q1), y1'=CostXXX(q1)/EstimatedXXX(q1)>, ..., <xN=EstimatedXXX(qN), yN'=CostXXX(qN)/Estimated(qN)>
Now those points will look like:
We can now perform regression analysis as we should have satisfied conditions for performing OLS regression (errors come from same distribution and have constant variance).
We can derive quantifiable model properties like:
Such metrics allow us to compare quality of cost estimates
Note: I think that there will be different error distributions depending how complex the plans are. For instance sophisticated plans with many plan nodes will have the error accumulated. To account for that, we could derive plan complexity (e.g: number of nodes, depth) as an additional input parameter to the model and we could perform analysis for each complexity level.
Individual operator cost quality
Let's call operator o
estimated cost as:
OperatorEstimatedUserCpu(o), OperatorEstimatedMemory(o), OperatorEstimatedNetwork(o)
.
Let's call actual operator cost as:
OperatorCostCpu(q), OperatorCostMemory(q), OperatorCostNetwork(q)
.
Let's call operator estimated input data size as:
EstimatedInputSize(o)
.
Let's call operator measured input data size as:
MeasuredInputSize(o)
.
All of those are random variables with respect to o
.
Observations:
OperatorEstimatedXXX(o)
and OperatorCostXXX(q)
. Because of that, let's normalize them by:NormalizedOperatorEstimatedXXX(o) = OperatorEstimatedXXX(o)/EstimatedInputSize(o)
NormalizedOperatorCostXXX(o) = OperatorCostXXX(o)/MeasuredInputSize(o)
Intuitively they tell how much memory is needed by operator per input byte. We can now define operator estimation error as:
OperatorEstimationErrorXXX(o) = NormalizedOperatorEstimatedXXX(o) - NormalizedOperatorCostXXX(o)
We can now:
OperatorEstimationErrorXXX
OperatorEstimationErrorXXX
, e.g: how much (per input byte) operator is under/overestimating cost.OperatorEstimationErrorXXX
has normal distribution we can make more sophisticated conclusions (e.g: if we added Constant
to OperatorEstimatedXXX
than for 90% of queries we would overestimate memory usage by this much).Note that we should perform such analysis for each operator type as they have different profiles.
Stats quality
Similar approach can be applied to operator nodes that filter the data (e.g: actual filtering factor vs estimated filtering factor). This way we can evaluate quality of FilterStatsCalculator
and rules that perform predicate estimation.
Adjusting model
The initial goal is to provide quality metrics for stats/cost model so that we could test and quantify model changes.
However, having gathered historical query and model data, we could introduce additional variables to our stats/cost computations that we would adjust to achieve desired model properties.
For instance, we could/should:
Having such variables we could adjust model for different hardware, data and workloads. Specifically we could adjust model for each client so that CBO produces better plans.
Plan Of Attack
The steps are as follows:
v1/query
) JSON so that they can be gathered and analyzed. This would be used to estimate model on predefined set of queries (e.g: TPCH/TPCDS) during benchmarks.Note that operators don't match 1-1 to plan nodes. For instance to obtain HashAggregationOperator
stats we would need to get stats for join build side.
EventListener
so that we could gather and store historical data points.A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.