protryon / klickhouse Goto Github PK
View Code? Open in Web Editor NEWRust crate for accessing Clickhouse
License: Apache License 2.0
Rust crate for accessing Clickhouse
License: Apache License 2.0
Does the serde
feature allows to serialize/deserialize Value struct?
I noticed a few of the read_i32 and write_i32 calls don't force the little-endian variants. Since I just found this by code inspection and my platform is little-endian, I can't easily test whether it's an actual bug or not. I just wanted to flag these in case they were an oversight.
diff --git a/klickhouse/src/block.rs b/klickhouse/src/block.rs
index 24911d9..894bed8 100644
--- a/klickhouse/src/block.rs
+++ b/klickhouse/src/block.rs
@@ -37,7 +37,7 @@ impl BlockInfo {
new.is_overflows = reader.read_u8().await? != 0;
}
2 => {
- new.bucket_num = reader.read_i32().await?;
+ new.bucket_num = reader.read_i32_le().await?;
}
field_num => {
return Err(KlickhouseError::ProtocolError(format!(
@@ -56,7 +56,7 @@ impl BlockInfo {
.write_u8(if self.is_overflows { 1 } else { 2 })
.await?;
writer.write_var_uint(2).await?;
- writer.write_i32(self.bucket_num).await?;
+ writer.write_i32_le(self.bucket_num).await?;
writer.write_var_uint(0).await?;
Ok(())
}
diff --git a/klickhouse/src/internal_client_in.rs b/klickhouse/src/internal_client_in.rs
index 94d419a..85874a4 100644
--- a/klickhouse/src/internal_client_in.rs
+++ b/klickhouse/src/internal_client_in.rs
@@ -34,7 +34,7 @@ impl<R: ClickhouseRead> InternalClientIn<R> {
}
async fn read_exception(&mut self) -> Result<ServerException> {
- let code = self.reader.read_i32().await?;
+ let code = self.reader.read_i32_le().await?;
let name = self.reader.read_string().await?;
let message = self.reader.read_string().await?;
let stack_trace = self.reader.read_string().await?;
It would be nice to be able to selectively generate some trait implementations that provide insert/query functions.
The Row
trait allows serializing a Rust struct to and from Clickhouse, e.g.
#[derive(klickhouse::Row)]
struct User {
age: u8,
name: String
}
let users: Vec<User> = ch.query_collect("SELECT age, name from users").await?;
In this example, let's assume we now want to retrieve user details and account balance in the same query. We would like to do something like
let users: Vec<(u32, User)> = ch.query_collect("SELECT credits, (age, name) FROM ...").await?;
// or
#[derive(klickhouse::Row)]
struct Row {
#[klickhouse(flatten)]
user: User,
credits: u32
}
let users: Vec<Row> = ch.query_collect("SELECT age, name, credits FROM ...").await?;
i.e. composing with the existing implementation of User: Row
:
Neither of the approaches are currently supported:
Row
is not automatically available on tuples of types implementing Row
. This would allow using (UnitValue<u32>, User)
as the query
return type.default
, rename
, skip
...), but not flattenRather, one has to manually implement Row
(subject to some issues described below).
Row
on composed typesFor the first approach, one would implement Row
on tuples implementing Row
.
For (T1, ..., Tn)
, one would either:
T1
and T2
.flatten
supportFor serialization and deserialization, one would:
{serialize, deserialize}_row
of the subfield, setting (resp. retrieving) the respective fields.One minor annoyance is the signature
fn deserialize_row(map: Vec<(&str, &Type, Value)>) -> Result<Self>;
which does not allow to efficiently retrieve the fields to pass to the recursive deserialize_rows
calls (i.e. one needs to linearly search for them and swap the Value
). This problem would disappear if the signature was
fn deserialize_row(map: IndexMap<String, (Type, Value)>) -> Result<Self>;
which is actually a pretty simple change in the code, but a breaking API change. The inefficiency probably does not matter with a small number of fields.
Another approach is to allow deriving
#[derive(klickhouse::Row)]
struct Row {
user: User,
credits: u32
}
where the User
fields are serialized and deserialized with a user
prefix, e.g.
SELECT name AS "user.name", age AS "user.age", credits FROM ...
Either implicitly, or with another attribute on the user: User
field.
I have a working draft for the flatten
approach, and I might have a go at the tuple approach too.
Before cleaning this up and submitting a PR, I wanted to ask whether you had any comments or thoughts about this?
I only recently started using clickhouse and this (very nice) crate, and I might be missing some subtleties.
Hey, thanks for working on this crate. I've run into an issue where DEFAULT
columns are not getting set properly.
To reproduce the issue, you can follow these steps:
create table test (id UInt32, id_bump DEFAULT id+1) ENGINE = MergeTree() ORDER BY id;
// main.rs
#[derive(Row, Debug, Default)]
pub struct Test {
id: u32,
}
// glue code
#[tokio::main]
async fn main() {
// init client
let row = Test {
id: 1,
};
client
.insert_native_block("INSERT INTO test FORMAT native", vec![row])
.await
.unwrap();
}
On querying the table, I see that the default expression (id
+1) set for column id_bump
is not being honored. Any idea why this may be?
Observed:
SELECT *
FROM test
┌─id─┬─id_bump─┐
│ 1 │ 0 │
└────┴─────────┘
Expected:
SELECT *
FROM test
┌─id─┬─id_bump─┐
│ 1 │ 2 │
└────┴─────────┘
cc: @tekjar
The Type::validate
method has a check that rejects multidimensional arrays with more than 2 dimensions, e.g. Array(Array(Array(T)))
: https://github.com/Protryon/klickhouse/blob/master/klickhouse/src/types/mod.rs#L811
Comparing
https://github.com/Protryon/klickhouse/blob/master/klickhouse/src/types/deserialize/array.rs#L25
with the python implementation
https://github.com/ClickHouse/clickhouse-connect/blob/main/clickhouse_connect/datatypes/container.py#L37
I was not able to find what breaks down for 3 or more dimensions. The Python implementation is more efficient in that it reads all the data at once rather than recursing, but this is all I see for now.
The following test passes both with integers and with strings as inner type (after commenting out the check):
#[derive(Debug, Clone, klickhouse::Row, PartialEq)]
struct Row {
a: Vec<Vec<Vec<u8>>>,
}
#[tokio::test]
async fn test_3d_array() {
let client = super::get_client().await;
let items = vec![
Row {
a: vec![
vec![vec![1, 2], vec![3], vec![4, 5, 6]],
vec![vec![7], vec![8, 9]],
],
},
Row {
a: vec![
vec![vec![10], vec![11, 12, 13], vec![14]],
vec![vec![15, 16], vec![17]],
vec![vec![18]],
],
},
];
client
.execute("drop table if exists test_serialize")
.await
.unwrap();
client
.execute(
r"
CREATE TABLE test_serialize (
a Array(Array(Array(UInt8)))
) ENGINE = Memory;
",
)
.await
.unwrap();
client
.insert_native_block("insert into test_serialize format native", items.clone())
.await
.unwrap();
let items2 = client
.query_collect("SELECT * from test_serialize")
.await
.unwrap();
assert_eq!(items, items2);
}
Do you remember what the reason for this restriction was?
Current (v0.8.8) is broken when rust type is Vec<u8>
or u8 slice.
Storing hash with hex encoded works fine with FixedString(64)
, but when trying to store a hash to a FixedString(32)
(to save space)
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: TypeParseError("could not assign value 'Array([UInt8(212), UInt8(229), UInt8(103), UInt8(64), UInt8(248), UInt8(118), UInt8(174), UInt8(248), UInt8(192), UInt8(16), UInt8(184), UInt8(106), UInt8(64), UInt8(213), UInt8(245), UInt8(103), UInt8(69), UInt8(161), UInt8(24), UInt8(208), UInt8(144), UInt8(106), UInt8(52), UInt8(230), UInt8(154), UInt8(236), UInt8(140), UInt8(13), UInt8(177), UInt8(203), UInt8(143)])' to type 'FixedString(32)'")', src/main.rs:311:14
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
The current Value struct is using integers instead of floats.
When struct fields are not declared in the same order as in the database, type hints will be passed to the wrong fields.
This affects only structs with fields requiring a type hint (decimal, bytes...) and results in an error from the server.
The Row::serialize_row
method signature is
fn serialize_row(
self,
type_hints: &[&Type]
) -> Result<Vec<(Cow<'static, str>, Value)>>;
It is called in client.rs
as follows:
let types = first_block.column_types.values().collect::<Vec<_>>();
rows.into_iter()
.map(|x| x.serialize_row(&types[..]))
The Row
derive macro implements serialize_row
by iterating over fields, and calling types[idx]
to get the type hint for each field
There is however no guarantee that the fields order will match between the ones in the struct declaration and in first_block
(returned by the server).
#[derive(klickhouse::Row)]
struct TestRow {
a: klickhouse::Bytes,
b: klickhouse::Bytes,
}
#[tokio::test]
async fn blah() {
let client = get_client().await;
prepare_table(
"test",
"
a Array(UInt8),
b String",
&client,
)
.await;
client
.insert_native_block(
"INSERT INTO test FORMAT Native",
vec![TestRow {
a: vec![].into(),
b: vec![].into(),
}],
)
.await
.unwrap();
}
This test passes, but fails if a
and b
are switched in the struct
declaration:
struct TestRow {
b: klickhouse::Bytes,
a: klickhouse::Bytes,
}
Indeed, b
now gets the type hint meant for a
.
The solution is pretty simple but requires a change to the Row
trait:
In client.rs
, column_types
is originally an IndexMap
in a Block
. Rather than passing it as a slice, one should pass the IndexMap
, allowing to safely retrieve field type hints by name.
This would change the signature of serialize_row
to
fn serialize_row(
self,
type_hints: &IndexMap<String, Type>,
) -> Result<Vec<(Cow<'static, str>, Value)>>;
hello,my bro.i'm a beginner on Rust,so maybe something i say sounds childrens,on your eyes..
https://stackoverflow.com/questions/61179070/rust-chrono-parse-date-string-parseerrornotenough-and-parseerrortooshort
I want to conver a standard datatime "2022-04-22 00:00:00" convert to the klickhouse::DateTime.
it should be okay,but failed. During finding the bug step by step, I write a test for this.
#[test]
fn test_consistency_with_convert_for_str() {
let test_date = "2022-04-22 00:00:00";
let dt = chrono::NaiveDateTime::parse_from_str(test_date, "%Y-%m-%d %H:%M:%S").unwrap();
let chrono_date =
chrono::DateTime::<Tz>::from_utc(dt, chrono_tz::UTC.offset_from_utc_datetime(&dt));
let date = DateTime(UTC, dt.timestamp() as u32);
let new_chrono_date: chrono::DateTime<Tz> = date.into();
assert_eq!(new_chrono_date, chrono_date);
}
also,there have two test before in the file src/values/date.rs...i confrim the old impl for From cant not pass this test_case.
@Protryon
I'm writing a kafka consumer, it consumes message and insert those message into clickhouse according to message's meta info, like:
{
"table": "ch_table_1",
"data": [
{"col_name": "foo", "type": "uint32", "val": "3"},
{"col_name": "bar", "type": "string", "val": "hello"}
//...
]
}
How do I construct the Row to insert? Didn't find any docs about this
I submit a SQL query that was expected to return 7 million rows, but the client threw an error :
"
ERROR [tokio-runtime-worker] [/Users/bmmcq/.cargo/registry/src/github.com-1ecc6299db9ec823/klickhouse-0.2.1/src/client.rs:168] clickhouse client failed: unexpected end of file
"
and exit with the wrong number of rows.
There is the code:
#[tokio::main]
async fn main() {
pegasus_common::logs::init_log();
let mut opt = ClientOptions::default();
opt.default_database = "mydb".to_owned();
let client = Client::connect("myip:9000", opt)
.await
.unwrap();
let query = "select * from mytable where my_id in (8,10,11,13,15,17,19,20,22,24,26,28,30)";
let start = Instant::now();
let mut all_rows = client
.query_raw(query)
.await
.unwrap();
let mut count = 0u64;
while let Some(row) = all_rows.next().await {
println!("row {}", row.rows);
count += row.rows;
}
println!("total received {} records, used {:?};", count, start.elapsed());
}
At the moment we have klickhouse::Row
derivable trait and klickhouse::Block
raw columnar storage. It would be ideal to have a derivable, easy-to-use klickhouse::Columnar
derivable trait akin to klickhouse::Row
but with the superior performance of actually being columnar.
No idea when I'll have time/motivation to implement this, but it's been on my mind.
In this example I attempt to pass the stream out of the function via a return value. It seems that it is immediately dropped when I do this?
I'm new-ish to rust so maybe this is just a lack of understanding.
use futures::stream::{Stream, StreamExt};
#[tokio::main]
async fn main() {
let client =
klickhouse::Client::connect("127.0.0.1:9000", klickhouse::ClientOptions::default())
.await
.unwrap();
let mut res = stream(client).await;
while let Some(row) = res.next().await {
let row = row.unwrap();
println!("row received {:?}", row);
}
}
#[derive(klickhouse::Row, Debug, Default)]
pub struct MyUserData {
x: String,
y: String,
}
async fn stream(
client: klickhouse::Client,
) -> impl Stream<Item = Result<MyUserData, klickhouse::KlickhouseError>> {
let all_rows = client
.query::<MyUserData>("select x, toTypeName(x) AS y from t1;")
.await
.unwrap();
all_rows
}
Expected output:
row received MyUserData { x: "1", y: "String" }
row received MyUserData { x: "2", y: "String" }
Actual output:
If I rerun it with the print statement inside the other function it works.
use futures::stream::{Stream, StreamExt};
#[tokio::main]
async fn main() {
let client =
klickhouse::Client::connect("127.0.0.1:9000", klickhouse::ClientOptions::default())
.await
.unwrap();
let _ = stream(client).await;
}
#[derive(klickhouse::Row, Debug, Default)]
pub struct MyUserData {
x: String,
y: String,
}
async fn stream(
client: klickhouse::Client,
) -> impl Stream<Item = Result<MyUserData, klickhouse::KlickhouseError>> {
let mut all_rows = client
.query::<MyUserData>("select x, toTypeName(x) AS y from t1;")
.await
.unwrap();
while let Some(row) = all_rows.next().await {
let row = row.unwrap();
println!("row received {:?}", row);
}
all_rows
}
Output:
row received MyUserData { x: "1", y: "String" }
row received MyUserData { x: "2", y: "String" }
FYI values in clickhouse:
SELECT *
FROM t1
┌─x─┐
│ 1 │
└───┘
┌─x─┐
│ 2 │
└───┘
2 rows in set. Elapsed: 0.021 sec.
Dude, the error handling on this is terrible.
Most errors are not returned, but simply logged.
And there is no trace about the path it took to reach that error.
There are read_exact
calls at Line#58 and Line#61.
klickhouse/klickhouse/src/io.rs
Lines 47 to 65 in ce78975
I would like to know if you have plans to add TLS support. I'm using Clickhouse Cloud and there's no other way to connect the database.
I'm building an application that allows users to execute SQL and retrieve data from Clickhouse. My use case needs to access "query_raw" and transform the data, but I cannot access Block
to create functions/traits to process this data into rows, for example.
The following test currently fails:
async fn test_one<
T: Clone + std::fmt::Debug + std::cmp::PartialEq + klickhouse::ToSql + klickhouse::FromSql,
>(
sample: T,
client: &klickhouse::Client,
) {
let sample2: klickhouse::UnitValue<T> = client
.query_one(klickhouse::QueryBuilder::new("SELECT $1").arg(sample.clone()))
.await
.unwrap();
assert_eq!(sample, sample2.0);
}
#[tokio::test]
async fn values_display() {
let client = super::get_client().await;
test_one(klickhouse::Uuid::new_v4(), &client).await;
test_one(
klickhouse::DateTime::try_from(chrono::Utc::now()).unwrap(),
&client,
)
.await;
test_one(
klickhouse::Date::from(chrono::NaiveDate::from_ymd_opt(2015, 3, 14).unwrap()),
&client,
)
.await;
}
This is because the implementation of the Display
trait on Value
is sometimes but not consistently compatible with the respective Clickhouse types.
For example, the following changes are required to make the above test pass:
@@ -386,19 +386,19 @@ impl fmt::Display for Value {
write!(f, "'")
}
Value::Uuid(uuid) => {
- write!(f, "'{}'", uuid)
+ write!(f, "toUUID('{}')", uuid)
}
Value::Date(date) => {
let chrono_date: NaiveDate = (*date).into();
- write!(f, "'{}'", chrono_date.format("%Y-%m-%d"))
+ write!(f, "{}", chrono_date.format("makeDate(%Y,%m,%d)"))
}
Value::DateTime(datetime) => {
let chrono_date: chrono::DateTime<Tz> =
(*datetime).try_into().map_err(|_| fmt::Error)?;
let string = chrono_date.to_rfc3339_opts(SecondsFormat::AutoSi, true);
- write!(f, "'")?;
+ write!(f, "parseDateTimeBestEffort('")?;
escape_string(f, &string)?;
- write!(f, "'")
+ write!(f, "')")
}
Value::DateTime64(datetime) => {
let chrono_date: chrono::DateTime<Tz> =
FromSql::from_sql(&Type::DateTime64(datetime.2, datetime.0), self.clone())
.map_err(|_| fmt::Error)?;
let string = chrono_date.to_rfc3339_opts(SecondsFormat::AutoSi, true);
write!(f, "parseDateTime64BestEffort('")?;
escape_string(f, &string)?;
write!(f, "', {})", datetime.2)
}
(node that DateTime64
was already correct)
I will send a draft PR with the test and the fixes for a couple of them but will probably need some time to address all Value variants.
Currently especially when inserting data, there doesn't appear to be a good way to create a decimal number using this library? Perhaps we could use rust_decimal here as a feature?
When performing a select query on a database with big arrays, the channel created to receive the query results get a capacity overflow.
This is so because it eventually fails to read correctly the size of the array. It looks like the pointer is not correctly placed when reading arrays. (Fixing the size for a quick test returns the array values with 0's at the beginning, so all the values are shifted).
@Protryon
Does this library support Bools? It looks like Value
doesn't have it.
/*
create table trade_event (
symbol LowCardinality(FixedString(16)),
exchange_id LowCardinality(String),
trade_id UInt64,
ts DateTime64(6),
received_ts DateTime64(6),
is_buyer_maker Nullable(Bool),
price Decimal64(6),
size Decimal64(6)
)
engine = MergeTree()
partition by ts
order by (symbol, exchange_id, ts);
*/
#[derive(Row, Debug, Default)]
pub struct TradeEvent {
symbol: String,
exchange_id: String,
trade_id: u64,
ts: DateTime64<6>,
received_ts: DateTime64<6>,
is_buyer_maker: Option<bool>,
price: FixedPoint64<6>,
size: FixedPoint64<6>,
}
Reading the row this currently gives the following error:
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: DeserializeErrorWithColumn("is_buyer_maker", "unexpected type: Int8")',
EDIT: Looks like not just option, its without as well.
Like that
#[derive(Row, Debug, Default)]
pub struct MyUserData {
#[serde(rename = "Id")]
id: Uuid,
#[serde(rename = "UserData")]
user_data: String,
#[serde(rename = "CreatedAt")]
created_at: DateTime,
}
I started using Klickhouse on my web server, but if any of the earlier queries have some errors, our server stops responding with a protocol error: failed to send query: channel closed
.
It looks like the ConnectionManager
holds the connection information and instantiates a new Client, could you add TLS support to the connection manager?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.