Giter Site home page Giter Site logo

influxdb2's Introduction

influxdb2

This is a Rust client to InfluxDB using the 2.0 API.

Setup

Add this to cargo.toml:

influxdb2 = "0.3"
influxdb2-structmap = "0.2"
influxdb2-derive = "0.1.1"
num-traits = "0.2"

Usage

Querying

use chrono::{DateTime, FixedOffset};
use influxdb2::{Client, FromDataPoint};
use influxdb2::models::Query;

#[derive(Debug, FromDataPoint)]
pub struct StockPrice {
    ticker: String,
    value: f64,
    time: DateTime<FixedOffset>,
}

impl Default for StockPrice {
    fn default() -> Self {
        Self {
            ticker: "".to_string(),
            value: 0_f64,
            time: chrono::MIN_DATETIME.with_timezone(&chrono::FixedOffset::east(7 * 3600)),
        }
    }
}

async fn example() -> Result<(), Box<dyn std::error::Error>> {
    let host = std::env::var("INFLUXDB_HOST").unwrap();
    let org = std::env::var("INFLUXDB_ORG").unwrap();
    let token = std::env::var("INFLUXDB_TOKEN").unwrap();
    let client = Client::new(host, org, token);

    let qs = format!("from(bucket: \"stock-prices\") 
        |> range(start: -1w)
        |> filter(fn: (r) => r.ticker == \"{}\") 
        |> last()
    ", "AAPL");
    let query = Query::new(qs.to_string());
    let res: Vec<StockPrice> = client.query::<StockPrice>(Some(query))
        .await?;
    println!("{:?}", res);

    Ok(())
}

Writing

#[derive(Default, WriteDataPoint)]
#[measurement = "cpu_load_short"]
struct CpuLoadShort {
    #[influxdb(tag)]
    host: Option<String>,
    #[influxdb(tag)]
    region: Option<String>,
    #[influxdb(field)]
    value: f64,
    #[influxdb(timestamp)]
    time: i64,
}

async fn example() -> Result<(), Box<dyn std::error::Error>> {
    use chrono::Utc;
    use futures::prelude::*;
    use influxdb2::models::DataPoint;
    use influxdb2::Client;
    use influxdb2_derive::WriteDataPoint;

    let host = std::env::var("INFLUXDB_HOST").unwrap();
    let org = std::env::var("INFLUXDB_ORG").unwrap();
    let token = std::env::var("INFLUXDB_TOKEN").unwrap();
    let bucket = std::env::var("INFLUXDB_BUCKET").unwrap();
    let client = Client::new(host, org, token);
    
    let points = vec![
        CpuLoadShort {
            host: Some("server01".to_owned()),
            region: Some("us-west".to_owned()),
            value: 0.64,
            time: Utc::now().timestamp_nanos(),
        },
        CpuLoadShort {
            host: Some("server02".to_owned()),
            region: None,
            value: 0.64,
            time: Utc::now().timestamp_nanos(),
        },
    ];

    client.write(bucket, stream::iter(points)).await?;
    
    Ok(())
}

Supported Data Types

InfluxDB data point doesn't support every data types supported by Rust. So, the derive macro only allows for a subset of data types which is also supported in InfluxDB.

Supported struct field types:

  • bool
  • f64
  • i64
  • u64 - DEPRECATED, will be removed in version 0.4
  • String
  • Vec
  • chrono::Duration
  • DateTime

Features

Implemented API

  • Query API
  • Write API
  • Delete API
  • Bucket API (partial: only list, create, delete)
  • Organization API (partial: only list)
  • Task API (partial: only list, create, delete)

TLS Implementations

This crate uses reqwest under the hood. You can choose between native-tls and rustls with the features provided with this crate. native-tls is chosen as the default, like reqwest does.

# Usage for native-tls (the default).
influxdb2 = "0.3"

# Usage for rustls.
influxdb2 = { version = "0.3", features = ["rustls"], default-features = false }

Development Status

This project is still at alpha status and all the bugs haven't been ironed yet. With that said, use it at your own risk and feel free to create an issue or pull request.

influxdb2's People

Contributors

abonander avatar aprimadi avatar boudewijn26 avatar h4l7 avatar hpsjakob avatar jeremy-prater avatar mixmasterfresh avatar ngc7293 avatar nycodeghg avatar p4ddy1 avatar poster515 avatar samiisd avatar ssnover avatar tomellm avatar zsluedem avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

influxdb2's Issues

Sum Aggregation function does not work as intended

I have the following query:

from(bucket: "dev_web3_proxy")
 |> range(start: 1678780033, stop: 1679256946)
 |> filter(fn: (r) => r["_measurement"] == "global_proxy")
 |> filter(fn: (r) => r["_field"] == "frontend_requests")
 |> filter(fn: (r) => r["chain_id"] == "1")
 |> group(columns: ["chain_id","_measurement","_field","_measurement","error_response","archive_needed"])
 |> aggregateWindow(every: 1000s, fn: mean, createEmpty: false)
 |> group()

Screenshot 2023-03-19 at 22 32 55

which functions correctly. When I now call sum instead of mean, all functions returned are "0", and incorrect. The statement works correctly in the UI:

2023-03-19 22 24 44

Any idea what could be off?

This is what the code returns in the case of "sum":

Influx responses are [
AggregatedRpcAccounting { chain_id: 0, field: "frontend_requests", value: 0.0, time: 2023-03-19T20:16:40+00:00, error_response: false, archive_needed: false }, 
AggregatedRpcAccounting { chain_id: 0, field: "frontend_requests", value: 0.0, time: 2023-03-19T20:16:40+00:00, error_response: false, archive_needed: false }, 
AggregatedRpcAccounting { chain_id: 0, field: "frontend_requests", value: 0.0, time: 2023-03-19T20:16:40+00:00, error_response: false, archive_needed: false }]

while with "mean", it seems correct:

Influx responses are [
AggregatedRpcAccounting { chain_id: 0, field: "frontend_requests", value: 2.5, time: 2023-03-19T20:16:40+00:00, error_response: false, archive_needed: false }, 
AggregatedRpcAccounting { chain_id: 0, field: "frontend_requests", value: 3.7222222222222223, time: 2023-03-19T20:16:40+00:00, error_response: false, archive_needed: false }, 
AggregatedRpcAccounting { chain_id: 0, field: "frontend_requests", value: 1.3333333333333333, time: 2023-03-19T20:16:40+00:00, error_response: false, archive_needed: false }]

Issue using influxdb2_structmap_derive::FromMap to parse struct

I was able to reproduce the query example. But then when I attempted to add the open field, I get a "Cannot parse out map entry". I suspect this is because the format of the data being returned from the query does not match the layout of the struct. Here is my Cargo.toml and my main.rs
Cargo.toml

[package]
name = "influxdb2_query_test"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1.20.0", features = [ "full" ]}
influxdb2 = "*"
influxdb2-structmap = "*"
influxdb2-structmap-derive ="*"
futures = "*"

src/main.rs

use futures::prelude::*;
use influxdb2::models::DataPoint;
use influxdb2::models::Query;
use influxdb2::Client;
use influxdb2_structmap::FromMap;
use std::env;
use futures::prelude::*;


#[derive(Debug, Default, influxdb2_structmap_derive::FromMap)]
pub struct StockPrice {
   ticker: String,
   value: f64,
   open: f64,
   time: i64,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let host = std::env::var("INFLUXDB_HOST").unwrap();
    let org = std::env::var("INFLUXDB_ORG").unwrap();
    let token = std::env::var("INFLUXDB_TOKEN").unwrap();
    let bucket = env::var("INFLUX_BUCKET").unwrap_or("test1".into());

    let client = Client::new(host, org, token);


    println!("HealthCheck: {:#?}", client.health().await?);
      let points: Vec<DataPoint> = vec![ DataPoint::builder("bar")
      .tag("ticker", "AAPL")
      .field("value", 123.46)
      .field("open", 200.0)
      .field("time", 0)
      .build()?, ];
      client.write(&bucket, stream::iter(points)).await?;
      let qs = format!("from(bucket: \"test1\") 
      |> range(start: -1w)
      |> filter(fn: (r) => r.ticker == \"{}\") 
      |> last()
   ", "AAPL");
       let query = Query::new(qs.to_string());

    println!("Query result was: {:#?}", client.query::<StockPrice>(Some(query))
    .await?);

    Ok(())
}

Here is the relevant cargo expand output.

pub struct StockPrice {
    ticker: String,
    value: f64,
    open: f64,
    time: i64,
}

#[automatically_derived]
#[allow(unused_qualifications)]
impl ::core::default::Default for StockPrice {
    #[inline]
    fn default() -> StockPrice {
        StockPrice {
            ticker: ::core::default::Default::default(),
            value: ::core::default::Default::default(),
            open: ::core::default::Default::default(),
            time: ::core::default::Default::default(),
        }
    }
}
use influxdb2_structmap::value::Value;
use influxdb2_structmap::GenericMap;
impl FromMap for StockPrice {
    fn from_genericmap(mut hashmap: GenericMap) -> StockPrice {
        let mut settings = StockPrice::default();
        let mut key = String::from("ticker");
        if !hashmap.contains_key(&key) {
            key = {
                let res = ::alloc::fmt::format(::core::fmt::Arguments::new_v1(
                    &["_"],
                    &[::core::fmt::ArgumentV1::new_display(&key)],
                ));
                res
            };
        }
        match hashmap.entry(key) {
            ::std::collections::btree_map::Entry::Occupied(entry) => {
                if let Value::String(v) = entry.get() {
                    settings.ticker = v.clone();
                }
            }
            _ => ::std::rt::begin_panic("Cannot parse out map entry"),
        }
        let mut key = String::from("value");
        if !hashmap.contains_key(&key) {
            key = {
                let res = ::alloc::fmt::format(::core::fmt::Arguments::new_v1(
                    &["_"],
                    &[::core::fmt::ArgumentV1::new_display(&key)],
                ));
                res
            };
        }
        match hashmap.entry(key) {
            ::std::collections::btree_map::Entry::Occupied(entry) => {
                if let Value::Double(v) = entry.get() {
                    settings.value = *v;
                }
            }
            _ => ::std::rt::begin_panic("Cannot parse out map entry"),
        }
        let mut key = String::from("open");
        if !hashmap.contains_key(&key) {
            key = {
                let res = ::alloc::fmt::format(::core::fmt::Arguments::new_v1(
                    &["_"],
                    &[::core::fmt::ArgumentV1::new_display(&key)],
                ));
                res
            };
        }
        match hashmap.entry(key) {
            ::std::collections::btree_map::Entry::Occupied(entry) => {
                if let Value::Double(v) = entry.get() {
                    settings.open = *v;
                }
            }
            _ => ::std::rt::begin_panic("Cannot parse out map entry"),
        }
        let mut key = String::from("time");
        if !hashmap.contains_key(&key) {
            key = {
                let res = ::alloc::fmt::format(::core::fmt::Arguments::new_v1(
                    &["_"],
                    &[::core::fmt::ArgumentV1::new_display(&key)],
                ));
                res
            };
        }
        match hashmap.entry(key) {
            ::std::collections::btree_map::Entry::Occupied(entry) => {
                if let Value::Long(v) = entry.get() {
                    settings.time = *v;
                }
            }
            _ => ::std::rt::begin_panic("Cannot parse out map entry"),
        }
        settings
    }
}

panicked on query containing pivot in latest version

Worked before in version 0.1

now 0.4.2 , the following query will panick

from ....
|> range .......
|> filter(  fn: (r) =>  r._field =~ /(fieldA) | (fieldB)/  )
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

panicked at 'called Option::unwrap() on a None value', /usr/local/cargo/registry/src/index.crates.io-6f17d22bba15001f/influxdb2-0.4.2/src/api/query.rs:606:75

Querying multiple datapoints with the same tag fails

Hi,

first of all I would like to express my thanks for this library and I really appreciate your effort. Since there is no official InfluxDB V2 library for Rust out there I'm having trouble finishing a project I'm working on in my free time. This library is like a godsend, however it seems that querying multiple datapoints is not possible. I had a look at your examples and always found that your queries ended with last(). If I remove that and have more than one entry in the Database with the same tag I get the following error:

stack backtrace:
   0: std::panicking::begin_panic
             at /rustc/addacb5878b9970ebc1665768a05cb601e7aea15/library/std/src/panicking.rs:616:12
   1: <testinflux::StockPrice as influxdb2_structmap::FromMap>::from_genericmap
             at ./src/main.rs:14:17
   2: influxdb2::api::query::<impl influxdb2::Client>::query::{{closure}}
             at /Users/moritz/.cargo/registry/src/github.com-1ecc6299db9ec823/influxdb2-0.2.7/src/api/query.rs:120:36
   3: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/addacb5878b9970ebc1665768a05cb601e7aea15/library/core/src/future/mod.rs:91:19
   4: testinflux::main::{{closure}}
             at ./src/main.rs:60:48
   5: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/addacb5878b9970ebc1665768a05cb601e7aea15/library/core/src/future/mod.rs:91:19
   6: tokio::park::thread::CachedParkThread::block_on::{{closure}}
             at /Users/moritz/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.20.1/src/park/thread.rs:263:54
   7: tokio::coop::with_budget::{{closure}}
             at /Users/moritz/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.20.1/src/coop.rs:102:9
   8: std::thread::local::LocalKey<T>::try_with
             at /rustc/addacb5878b9970ebc1665768a05cb601e7aea15/library/std/src/thread/local.rs:445:16
   9: std::thread::local::LocalKey<T>::with
             at /rustc/addacb5878b9970ebc1665768a05cb601e7aea15/library/std/src/thread/local.rs:421:9
  10: tokio::coop::with_budget
             at /Users/moritz/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.20.1/src/coop.rs:95:5
  11: tokio::coop::budget
             at /Users/moritz/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.20.1/src/coop.rs:72:5
  12: tokio::park::thread::CachedParkThread::block_on
             at /Users/moritz/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.20.1/src/park/thread.rs:263:31
  13: tokio::runtime::enter::Enter::block_on
             at /Users/moritz/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.20.1/src/runtime/enter.rs:152:13
  14: tokio::runtime::thread_pool::ThreadPool::block_on
             at /Users/moritz/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.20.1/src/runtime/thread_pool/mod.rs:90:9
  15: tokio::runtime::Runtime::block_on
             at /Users/moritz/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.20.1/src/runtime/mod.rs:484:43
  16: testinflux::main
             at ./src/main.rs:63:5
  17: core::ops::function::FnOnce::call_once
             at /rustc/addacb5878b9970ebc1665768a05cb601e7aea15/library/core/src/ops/function.rs:248:5

Are you aware of this? Is this intended? Is there another way to query data other than the way described in the examples? I'd really like to use your library but with this it's currently impossible. I'd also be willing to help out if that is desired.

Anyway, thanks in advance. Your work is much appreciated!

Improve performance by parsing directly into `struct`

In working with this package and looking for performance optimisations one of the possible bottlenecks we identified was that queries were first read into a map, be that FluxRecord or GenericMap after which they'd be converted into the supplied struct through the FromMap trait. While at the moment we wouldn't have a need for optimising this last bit of performance, we thought it might be worthwhile to share the thinking we've done on this so far. Both to get the ball rolling should the need for this last bit of performance arise or for others to pick up if they should feel like it.

For simplicity we'll assume the following query and struct:

from(bucket: "bucket")
|> range(start: -1d)
|> keep(columns: ["_time", "_value", "location"])
struct Row {
  value: f64,
  time: DateTime<FixedOffset>,
  location: String
}

The main goal is for the impl FallibleIterator for QueryTableResult to have a generic Item. The way I'd go about it is by defining a

trait RowBuilder<T: ?Sized> {
  fn column_order(&mut self, columns: Vec<&str>);
  fn next_val(&mut self, val: Value);
  fn build_row(&mut self) -> Result<T, RowBuildError>;
}

enum RowBuildError {
    MissingValues,
    IncorrectValue,
    MissingColumn,
}

and adding a

trait Buildable<T: RowBuilder<Self>> where Self: Sized {
  fn builder() -> T;
}

then the trick would be to define the #[derive(Buildable)] to add to our struct Row. struct QueryTableResult would then contain the impl RowBuilder.

An implementation of RowBuilder for our Row would be:

struct Builder {
    value: Option<f64>, // fields in original struct as Options
    time: Option<DateTime<FixedOffset>>,
    location: Option<String>,
    value_index: usize, // index for each field
    time_index: usize,
    location_index: usize,
    index: usize,
}

impl RowBuilder<Row> for Builder {
    fn column_order(&mut self, columns: Vec<&str>) -> Result<(), RowBuildError> {
        let mut value_set = false;
        let mut time_set = false;
        let mut location_set = false;
        for (idx, col) in columns.iter().enumerate() {
            if *col == "_value" || *col == "value" {
                self.value_index = idx;
                value_set = true;
            }
            if *col == "_time" || *col == "time" {
                self.time_index = idx;
                time_set = true;
            }
            if *col == "_location" || *col == "location" {
                self.location_index = idx;
                location_set = true;
            }
        }
        if value_set && time_set && location_set {
            Ok(())
        } else {
            Err(RowBuildError::MissingColumn)
        }
    }

    fn next_val(&mut self, val: Value) -> Result<(), RowBuildError> {
        if self.index == self.value_index {
            match val {
                Value::Double(d) => self.value = Some(d.0),
                _ => Err(RowBuildError::IncorrectValue)?
            }
        } else if self.index == self.time_index {
            match val {
                Value::TimeRFC(t) => self.time = Some(t),
                _ => Err(RowBuildError::IncorrectValue)?
            }
        } else if self.index == self.location_index {
            match val {
                Value::String(s) => self.location = Some(s),
                _ => Err(RowBuildError::IncorrectValue)?
            }
        }
        self.index += 1;
        Ok(())
    }

    fn build_row(&mut self) -> Result<Row, RowBuildError> {
        let result = match (self.value, self.time, self.location) {
            (Some(value), Some(time), Some(location)) => Ok(Row { value, time, location }),
            _ => Err(RowBuildError::MissingValues),
        };
        self.value = None;
        self.time = None;
        self.location = None;
        self.index = 0;
        result
    }
}

While the implementation of Builder isn't the easiest, I think it wouldn't be that difficult to see where it can be generalised in terms of the struct fields.

To maintain compatibility you'd probably want an impl RowBuilder<FluxRecord> as well. I hope you find this approach constructive.

Seems to fail silently?

I know this is still alpha code but trying to do a write, everything appears to work, code connects to InfluxDB, but there's nothing in the bucket, so I'm perplexed as to where it's going or what's happening. If it thew an error I would have somewhere to start.

Using InfluxDB 2.4.0 for reference. I have code on Github but currently in private repo.

feature request: batching

I find in every program I write that logs to influx, I end up writing some version of the same batching abstraction.
This may be something you'd consider adding to the library.

My current version, for example:

pub struct InfluxTaskConfig {
    pub bucket : String,
    pub max_batch_size : u32,
    pub max_period_secs : u32
}

impl Default for InfluxTaskConfig {
    fn default() -> Self {
        Self { bucket: "test".to_string(), max_batch_size: 100, max_period_secs: 60 }
    }
}

impl InfluxTaskConfig {
    /// batches datapoints in groups of up to max_batch_size, waiting at most max_period_secs for a batch to fill before sending a partially full batch.
    pub async fn influx_task(self, client : influxdb2::Client, chan : postage::mpsc::Receiver<DataPoint>) {
        use futures::StreamExt;
        use futures_batch::ChunksTimeoutStreamExt;
        chan.chunks_timeout(self.max_batch_size as usize, Duration::from_secs(self.max_period_secs as u64))
            .for_each(|chunk| async {
                println!("\nsending to influx\n");

                client.write(self.bucket.as_str(), stream::iter(chunk)).await.unhandled_unwrap();
            }).await;
    }
}

`Option` values in structs should exclude field in case of `None`, instead of being set to `"None"`

If a struct is used like:

#[derive(Default, Debug, WriteDataPoint)]
#[measurement = "something"]
struct PowerFlowData {
    #[influxdb(tag)]
    abc: String,
    #[influxdb(field)]
    def: Option<f64>,
    #[influxdb(timestamp)]
    time: i64
}

then the option field is not handled correctly in the None case. The generated query to the database will result in an Invalid format for field def error. This is due to the fact that the None value will be encoded to "None" which is not a valid float value.

See

impl<T: ValueWritable> ValueWritable for Option<T> {
fn encode_value(&self) -> String {
match self {
Some(v) => v.encode_value(),
None => "\"None\"".to_string(),
}
}
}

The better way would be to just exclude the field in the query if it is None.

[BUG] Invalid URL concatenation: double slashes

Issues

Context

If the provided influxdb url ends with / (e.g. http://localhost:8086/), the following format generates an API url with double slashes:

https://github.com/aprimadi/influxdb2/blob/main/src/api/write.rs#L20

While this isn't an issue for some servers, it is for the other ones, e.g.:

What is making this issue even worst with InfluxDB server, is that the return code is 200, therefore this client thinks that the entries were correctly added, while no new data is observed on the influxdb server.

Summary

  1. url concatenation should avoid double slash
  2. 200 should be considered as an eror for api/v2/write, success is 204

Strings aren't escaped properly in the derived `WriteDataPoint`

Strings that contain quotes aren't properly escaped and cause writes to fail.

Minimal example

#[derive(Default, WriteDataPoint)]
#[measurement = "quote_strings"]
struct QuoteStrings {
    #[influxdb(tag)]
    string_tag: String,
    #[influxdb(field)]
    string_field: String,
    #[influxdb(timestamp)]
    time: i64,
}

expands to:

impl ::influxdb2::models::WriteDataPoint for QuoteStrings {
    fn write_data_point_to<W>(&self, mut w: W) -> std::io::Result<()> where W: std::io::Write {
        w.write_all(format!("{},", "quote_strings").as_bytes())?;
        w.write_all(format!("{}", "string_tag").as_bytes())?;
        w.write_all(b"=")?;
        w.write_all(<String as ::influxdb2::writable::KeyWritable>::encode_key(&self.string_tag).into_bytes().as_slice())?;
        w.write_all(b" ")?;
        w.write_all(format!("{}", "string_field").as_bytes())?;
        w.write_all(b"=")?;
        w.write_all(<String as ::influxdb2::writable::ValueWritable>::encode_value(&self.string_field).into_bytes().as_slice())?;
        w.write_all(b" ")?;
        w.write_all(<i64 as ::influxdb2::writable::TimestampWritable>::encode_timestamp(&self.time).into_bytes().as_slice())?;
        w.write_all(b"\n")?;
        Ok(())
    }
}

Currently, the KeyWritable just clones the string, and the ValueWritable implementation formats the string with escaped quotes surrounding it. Both would result in errors if the values contain quotes.

https://docs.influxdata.com/influxdb/v1/write_protocols/line_protocol_reference/#special-characters
Looks like there's a few other characters that need escaping too.

I'd be happy to add a fix for this and open a PR some time next week

failed to resolve: use of undeclared crate or module `influxdb2_structmap` use of undeclared crate or module `influxdb2_structmap`

I'm just trying to get the example in the readme to compile and got a compilation error.

use chrono::{DateTime, FixedOffset};
use influxdb2::models::Query;
use influxdb2::{Client, FromDataPoint};

#[derive(Debug, FromDataPoint)]
pub struct StockPrice {
    ticker: String,
    value: f64,
    time: DateTime<FixedOffset>,
}

impl Default for StockPrice {
    fn default() -> Self {
        Self {
            ticker: "".to_string(),
            value: 0_f64,
            time: chrono::MIN_DATETIME.with_timezone(&chrono::FixedOffset::east(7 * 3600)),
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let host = std::env::var("INFLUXDB_HOST").unwrap();
    let org = std::env::var("INFLUXDB_ORG").unwrap();
    let token = std::env::var("INFLUXDB_TOKEN").unwrap();
    let client = Client::new(host, org, token);

    let qs = format!(
        "from(bucket: \"stock-prices\") 
        |> range(start: -1w)
        |> filter(fn: (r) => r.ticker == \"{}\") 
        |> last()
    ",
        "AAPL"
    );
    let query = Query::new(qs.to_string());
    let res: Vec<StockPrice> = client.query::<StockPrice>(Some(query)).await?;
    println!("{:?}", res);

    Ok(())
}

cargo check says:

failed to resolve: use of undeclared crate or module `influxdb2_structmap`
use of undeclared crate or module `influxdb2_structmap`

use of deprecated constant `chrono::MIN_DATETIME`: Use DateTime::MIN_UTC instead
`#[warn(deprecated)]` on by default

vscode shows the error on the FromDataPoint in #[derive(Debug, FromDataPoint)]

What am I doing wrong?

Exposing the underlying reqwest client to support custom mTLS certificates.

Hello! My use case requires me to support mutual authentication between the InfluxDB2 server & the client. Adding the server's CA certificate to the client is simple enough by modifying the CA certificates present on the host machine, but I don't see a way to provide a TLS certificate to this crate in order to not default to a self-signed client certificate.

If I'm understanding correctly, the issue I'm running into occurs because the underlying reqwest::Client object is private (https://docs.rs/influxdb2/latest/src/influxdb2/lib.rs.html#159). If this were public, or if some kind of pass-through were provided, I believe I understand how to provide a custom TLS configuration to the client. Is there another way? If my understanding is correct and this is a feature you are willing to support, I would be happy to contribute.

Thank you.

Derive macro FromDataPoint not working for nested types?

The following usage of the FromDataPoint derive macro yields an error:

#[derive(Debug, Default, Clone, PartialEq, Eq, FromDataPoint)]
pub struct Record {
    pub metadata: Metadata,
    pub releases: Vec<Content>,
}

#[derive(Debug, Default, Clone, PartialEq, Eq, FromDataPoint)]
pub struct Metadata {
    pub version: String,
}

#[derive(Debug, Default, Clone, PartialEq, Eq, FromDataPoint)]
pub struct Content {
    pub name: String,
    pub values: String,
}

gives

error: proc-macro derive panicked
 --> src/record.rs:6:48
  |
6 | #[derive(Debug, Default, Clone, PartialEq, Eq, FromDataPoint)]
  |                                                ^^^^^^^^^^^^^
  |
  = help: message: Metadata is not handled

Is there a special way to handle nested values such as these? Thank you.

Question: timestamp

The timestamp field within the DataPointBuilder struct is a i64, however, if I want to set the timestamp myself in nanoseconds I'd be using u128. So when using DataPoint::builder I get a mismatched types error. Is there something I'm missing? Of course, I note that u128 isn't listed in the supported types. Many thanks, a baby Rustacean.

Compilation fails with serde 1.0.157

error: unexpected end of input, unexpected token in nested attribute, expected ident
  --> influxdb2/src/models/ast/dialect.rs:16:13
   |
16 |     #[serde()]
   |             ^

This crate should be imported using a one-liner in `Cargo.toml`

We should join the influxdb2_derive, influxdb2_structmap and num-traits into the same crate (maybe as additional features) so that the Cargo.toml of any user of this crate could look something like:

influxdb2 = { version = "0.5", features = ["derive"] }

Many other crates out there do this, i.e., serde

Value Enum inaccessible

Its not possible to extract a DateTime column from a FluxRecord. As it isnt a public Enum one can't match against it, and it doesn't provide any methods to access it if it is a Timestamp...
.f64() returns None
.u64() returns None
.string() returns None
.i64() returns None
.to_string() panics

It would be more ergonomic if the enum were public so one could match against it

Missing feature: Timestamp precision

Hi,

I'm just using this Lib for writing a script that populates an InfluxDB and am quite happy with it!
During that project I came across the following issue:
As far as I could see this lib always writes the timestamp with nanoseconds precision.

Do you plan do add support for the other available precision-options?
https://docs.influxdata.com/influxdb/v2.5/reference/syntax/line-protocol/#timestamp

Would you accept a Merge Request for that feature?

Regards,
Jakob

Failure to Query

Like several before me, thank you for making this library! I'm happily writing to my DB, however I am struggling to read/run queries against it.

I have a query that returns:
Screen Shot 2022-09-21 at 10 58 17 AM

How do I read this? My current attempt is:

// AggregatedUsageDat definition
#[derive(Debug, FromDataPoint)]
pub struct AggregatedUsageData {
    pub device_id: String,
    pub value: i64,
}

impl Default for AggregatedUsageData {
    fn default() -> Self {
        Self {
            device_id: "".to_string(),
            value: 0_i64
        }
    }
}

// function definition
pub async fn read_and_aggregate_timeseries(
    start_time: u32,
    connection_info: InfluxConnectionInfo<'_>,
) -> Result<Vec<AggregatedUsageData>, Box<dyn std::error::Error>> {
    let client = Client::new(
        connection_info.host,
        connection_info.org,
        connection_info.token,
    );

    let qs = format!(
        "from(bucket: \"{bucket}\")
        |> range(start: {time})
        |> filter(fn: (r) => r[\"_measurement\"] == \"device-usage\")
        |> filter(fn: (r) => r[\"_field\"] == \"usage\")
        |> group(columns: [\"deviceId\"])
        |> drop(columns: [\"_start\", \"_stop\"])
        |> sum()
        |> rename(columns: {{_value: \"value\", deviceId: \"device_id\"}})
        ",
        bucket = connection_info.bucket,
        time = start_time
    );

    log::info!("Query String:\n{}", qs.to_string());

    let res: Vec<AggregatedUsageData> = client.query::<AggregatedUsageData>(Some(query)).await?;

    Ok(res)
}

This results in the error thread 'timeseries::can_read' panicked at 'called 'Option::unwrap()' on a 'None' value'

Logging my query string, I know it's correct (see above screenshot). I'm not really sure what's going on under the hood, but it seems like it would map the column values by name onto this vector? What am I doing wrong?

Query that panics

If you run the the following query:

import "influxdata/influxdb/schema"
schema.measurementTagValues(bucket: "fluentbit", measurement: "avassa_container", tag: "application")

The result looks like this:

[influxdb2/src/api/query.rs:402] &map = {
    "_value": String(
        "popcorn-controller",
    ),
    "result": String(
        "_result",
    ),
    "table": Long(
        0,
    ),
}
[influxdb2/src/api/query.rs:402] &map = {
    "_value": String(
        "sec-test",
    ),
    "result": String(
        "_result",
    ),
    "table": Long(
        0,
    ),
}

I.e. _field is missing, so you'll get a panic in query.rs, QueryResult::new.

I'm more than happy to fix this, but would like some input on the best way forward. One option could be to have a synthetic key name, e.g. "result-X", where X is an increasing integer.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.