Giter Site home page Giter Site logo

redis-async-rs's Introduction

redis-async

Using Tokio and Rust's futures to create an asynchronous Redis client. Documentation

Releases

The API is currently low-level and still subject to change.

Initially I'm focussing on single-server Redis instances, another long-term goal is to support Redis clusters. This would make the implementation more complex as it requires routing, and handling error conditions such as MOVED.

Recent changes

Version 0.14 introduces experimental TLS support, use feature flag with-rustls for Rustls support, or with-native-tls for native TLS support. There are other minor changes to the public API to enable this, in particular separate host and port arguments are required rather than a single addr argument.

Other clients

When starting this library there weren't any other Redis clients that used Tokio. However the current situation is more competitive:

Usage

There are three functions in redis_async::client which provide functionality. One is a low-level interface, a second is a high-level interface, the third is dedicated to PUBSUB functionality.

Low-level interface

The function client::connect returns a future that resolves to a connection which implements both Sink and Stream. These work independently of one another to allow pipelining. It is the responsibility of the caller to match responses to requests. It is also the responsibility of the client to convert application data into instances of resp::RespValue and back (there are conversion traits available for common examples).

This is a very low-level API compared to most Redis clients, but is done so intentionally, for two reasons: 1) it is the common demoniator between a functional Redis client (i.e. is able to support all types of requests, including those that block and have streaming responses), and 2) it results in clean Sinks and Streams which will be composable with other Tokio-based libraries.

This low-level connection will be permanently closed if the connection with the Redis server is lost, it is the responsibility of the caller to handle this and re-connect if necessary.

For most practical purposes this low-level interface will not be used, the only exception possibly being the MONITOR command.

Example

An example of this low-level interface is in examples/monitor.rs. This can be run with cargo run --example monitor, it will run until it is Ctrl-C'd and will show every command run against the Redis server.

High-level interface

client::paired_connect is used for most Redis commands (those for which one command returns one response, it's not suitable for PUBSUB, MONITOR or other similar commands). It allows a Redis command to be sent and a Future returned for each command.

Commands will be sent in the order that send is called, regardless of how the future is realised. This is to allow us to take advantage of Redis's features by implicitly pipelining commands where appropriate. One side-effect of this is that for many commands, e.g. SET we don't need to realise the future at all, it can be assumed to be fire-and-forget; but, the final future of the final command does need to be realised (at least) to ensure that the correct behaviour is observed.

In the event of a failure of communication to the Redis server, this connect will attempt to reconnect. Commands will not be automatically re-tried, however; it is for calling code to handle this and decide whether a particular command should be retried or not.

Example

See examples/realistic.rs for an example using completely artificial test data, it is realistic in the sense that it simulates a real-world pattern where certain operations depend on the results of others.

This shows that the code can be written in a straight line fashion - iterate through the outer-loop, for each make a call to INCR a value and use the result to write the data to a unique key. But when run, the various calls will be pipelined.

In order to test this, a tool like ngrep can be used to monitor the data sent to Redis, so running cargo run --release --example realistic (the --release flag needs to be set for the buffers to fill faster than packets can be sent to the Redis server) shows the data flowing:

interface: lo0 (127.0.0.0/255.0.0.0)
filter: (ip or ip6) and ( port 6379 )
#####
T 127.0.0.1:61112 -> 127.0.0.1:6379 [AP]
  *2..$4..INCR..$18..realistic_test_ctr..*2..$4..INCR..$18..realistic_test_ctr..*2..$4..INCR..$18..
  realistic_test_ctr..*2..$4..INCR..$18..realistic_test_ctr..*2..$4..INCR..$18..realistic_test_ctr.
  .*2..$4..INCR..$18..realistic_test_ctr..*2..$4..INCR..$18..realistic_test_ctr..*2..$4..INCR..$18.
  .realistic_test_ctr..*2..$4..INCR..$18..realistic_test_ctr..*2..$4..INCR..$18..realistic_test_ctr
  ..
##
T 127.0.0.1:6379 -> 127.0.0.1:61112 [AP]
  :1..:2..:3..:4..:5..:6..:7..:8..:9..:10..
##
T 127.0.0.1:61112 -> 127.0.0.1:6379 [AP]
  *3..$3..SET..$4..rt_1..$1..0..*3..$3..SET..$1..0..$4..rt_1..*3..$3..SET..$4..rt_2..$1..1..*3..$3.
  .SET..$1..1..$4..rt_2..*3..$3..SET..$4..rt_3..$1..2..*3..$3..SET..$1..2..$4..rt_3..*3..$3..SET..$
  4..rt_4..$1..3..*3..$3..SET..$1..3..$4..rt_4..*3..$3..SET..$4..rt_5..$1..4..*3..$3..SET..$1..4..$
  4..rt_5..*3..$3..SET..$4..rt_6..$1..5..*3..$3..SET..$1..5..$4..rt_6..*3..$3..SET..$4..rt_7..$1..6
  ..*3..$3..SET..$1..6..$4..rt_7..*3..$3..SET..$4..rt_8..$1..7..*3..$3..SET..$1..7..$4..rt_8..*3..$
  3..SET..$4..rt_9..$1..8..*3..$3..SET..$1..8..$4..rt_9..*3..$3..SET..$5..rt_10..$1..9..*3..$3..SET
  ..$1..9..$5..rt_10..
##
T 127.0.0.1:6379 -> 127.0.0.1:61112 [AP]
  +OK..+OK..+OK..+OK..+OK..+OK..+OK..+OK..+OK..+OK..+OK..+OK..+OK..+OK..+OK..+OK..+OK..+OK..+OK..+O
  K..

See note on 'Performance' for what impact this has.

PUBSUB

PUBSUB in Redis works differently. A connection will subscribe to one or more topics, then receive all messages that are published to that topic. As such the single-request/single-response model of paired_connect will not work. A specific client::pubsub_connect is provided for this purpose.

It returns a future which resolves to a PubsubConnection, this provides a subscribe function that takes a topic as a parameter and returns a future which, once the subscription is confirmed, resolves to a stream that contains all messages published to that topic.

In the event of a broken connection to the Redis server, this connection will attempt to reconnect. Any existing subscriptions, however, will be terminated, it is the responsibility of the calling code to re-subscribe to topics as necessary.

Example

See an examples/subscribe.rs. This will listen on a topic (by default: test-topic) and print each message as it arrives. To run this example: cargo run --example subscribe then in a separate terminal open redis-cli to the same server and publish some messages (e.g. PUBLISH test-topic TESTING).

Performance

I've removed the benchmarks from this project, as the examples were all out-of-date. I intend, at some point, to create a separate benchmarking repository which can more fairly do side-by-side performance tests of this and other Redis clients.

Next steps

  • Better documentation
  • Test all Redis commands
  • Decide on best way of supporting Redis transactions
  • Decide on best way of supporting blocking Redis commands
  • Ensure all edge-cases are complete (e.g. Redis commands that return sets, nil, etc.)
  • Comprehensive benchmarking against other Redis clients

License

Licensed under either of

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.

redis-async-rs's People

Contributors

antialize avatar arniu avatar asevans48 avatar atul9 avatar augustuswm avatar benashford avatar daboross avatar gak avatar nazar-pc avatar nehliin avatar pandaman64 avatar robjtede avatar saks avatar songzhi avatar yjh0502 avatar yotamofek avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

redis-async-rs's Issues

Memoize `connect` (or at least `paired_connect`)

It makes sense to me that the default behavior of cloning a connection seems to be to re-use the underlying PairedConnectionInner and thus the underlying RespConnection. This is efficient, and it's probably what the user wants anyways.

Would it make sense to memoize connect or paired_connect so that if you pass in the same address, you can re-use the same connection? Of course, I can build that on top of this library, but I feel like it would make this library easier to use with no drawbacks.

I do think pubsub connections have to be on distinct connections than paired connections or than each other, but of course I could be wrong on that. If this is the case, it doesn't make sense to memoize on connect, but I think it still make sense to memoize on paired_connect.

Pub/Sub error: Unexpected("PUBSUB message should be encoded as an array")

Hello, I tried to run the code provided in examples folder but for subscription I get this error, I don't know how to fix it.
could you help me please.

i'm on MacOS M1 and using the redis-async = "0.16.0" also redis is hosted on docker

this is the code that I have currently

let mut stream_messages = redis_async_pubsubconn
                .subscribe(&topic)
                .await
                .unwrap();

and for receiving part

while let Some(message) = stream_messages.next().await{ 
      let resp_val = message.unwrap();
      let message = String::from_resp(resp_val).unwrap();
  }

How to connect response to a specific connection.

I hope it's alright to post this here, since this is really not a bug in the library.

I'm putting together a command-line utility to monitor any number of Redis instances asynchronously. So, essentially:

use futures::stream::SelectAll;
use futures::{prelude::*, select, stream::FuturesUnordered};

use redis_async::{
    client::{self, connect::RespConnection},
    resp::RespValue,
    resp_array,
};
use std::{convert::TryInto, error::Error};

async fn get_connection(port: u16) -> Result<RespConnection, std::io::Error> {
    let addr = format!("127.0.0.1:{}", port).parse().unwrap();
    client::connect(&addr).await
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let args = std::env::args().skip(1).collect::<Vec<_>>();

    let mut connections = SelectAll::new();

    for arg in &args {
        let mut con = get_connection(arg.parse().unwrap()).await?;
        con.send(resp_array!["MONITOR"]).await?;
        let con = con.skip(1);
        connections.push(con);
    }

    while let Some(v) = connections.next().await {
        println!("{:?}", v);
    }

    Ok(())
}

This works well, but I can't for the life of me figure out how I would go about determining which stream produced a given reply. It produces RespValues but there isn't any context about the stream that produced it (as far as I can tell).

How would you suggest going about this?

Thanks again for the library!

Internal("Subscription failed, try again later...") on pubsub

I have an app where pubsub connection is created from actix_rt runtime (based on Tokio runtime), but occasionally I get error just like in #47 when trying to subscribe to the topic.

Fails the same in these configurations:

  • Linux host with Redis in Docker
  • macOS host with Redis in Docker (within VM of course)
  • Linux host with Redis on AWS (Elasticache)

Turns out that library drops connection when stream from subscription is dropped, while the expectation is that subscription will be removed and connection will remain active.
It also happens to not always being able to reconnect before subscribing to another subject, which seems to be exactly the kind of race condition Rust is supposed to avoid.

Also no reconnection happens, despite Redis didn't go away and connection from another library (https://github.com/mitsuhiko/redis-rs) is active all the time.

Support for explicit pipelining

Hi,

I was trying to use the PairedConnection with explicit pipelining and was unable to get it to work. For example, I was trying to retrieve multiple values by sending a singe request

GET foo
GET bar

to Redis. Unfortunately, PairedConnection::send does not support pipelined requests like this. Correct me if I'm wrong, but to me it seems that this is just an issue with RespValue and the way it is serialized. The closest I came to get it to work was with nested RespValue::Arrays, something like:

paired_connection.send::<String>(resp_array![resp_array!["GET", "foo"], resp_array!["GET "bar"]]).await

which raised the Redis Protocol Error ERR Protocol error: expected '$' got '*' which looks to me like the request is just serialized wrongly. Redis would expect the two pipelined GET requests to be serialized as: *2\r\n\$3\r\nGET\r\n\$3\r\nfoo\r\n*2\r\n\$3\r\nGET\r\n\$3\r\nbar\r\n, but somewhere there is a * too many. I guess in front of bar and foo there is an additional *1\r\n or nested arrays are serialized in the wrong way, something like that.

So to me it looks like this crate would support explicit pipelining with a few interface changes to RespArray. Would it be possible/desired to allow explicit pipelining in the future? I'd greatly appreciate that feature.

Panic when an unexpected message is received

I noticed an issue and I can't explain why. My application was using this crate but I encountered a panic last night, inspecting the code I noticed this line:

None => panic!("Received unexpected message: {:?}", msg),

Why there is a panic! here and not an error!? In the doc I can't see anything that says that the crate can panic in some circumstances.

Is this an expected behaviour? Can this be fixed?

Support for commands with a variable number of parameters, e.g. HMSET

Hi Ben

Is the HMSET operation supported? This doesn't seem to work, using a Vec<(String, String)> type:

        let items: Vec<(String, String)> = vec![("one".to_string(), "uno".to_string()),
                                               ("two".to_string(), "dos".to_string()),
                                               ("three".to_string(), "tres".to_string())];
        
        resp_array!["HMSET", "mykey", items]))

Update: Unfortunately, the solution is a bit more challenging:

    let items = vec![("one".to_string(), "uno".to_string()),
                     ("two".to_string(), "dos".to_string()),
                     ("three".to_string(), "tres".to_string())];

    let parts: Vec<RespValue> = 
        vec!["HMSET".into(), "mykey".into()].into_iter()
        .chain(items.into_iter().flat_map(|(a, b)| once(a.into()).chain(once(b.into()))))
        .collect();
    
    let send_fut = redis.send(Command(RespValue::Array(parts)));

What is our reconnect mechanism like?

Hi, I have some questions. I initialized a global connection using the once_cell crate, and I used it in two ways.

My test order is as follows:

  1. Start the Redis service.
  2. Run the test.
  3. View the console log.
  4. Stop the Redis service.
  5. View the console log.
  6. Start Redis.
  7. View the console log.

One way is like this (reconnect success):

#[cfg(test)]
mod test {
    use once_cell::sync::OnceCell;
    use redis_async::client::PairedConnection;
    use redis_async::error::Error;
    use redis_async::resp_array;
    use std::thread;
    use std::time::Duration;
    use tokio::join;

    static GLOBAL_CONNECTION: OnceCell<PairedConnection> = OnceCell::new();

    #[tokio::test]
    async fn test_redis() {
        let connection = redis_async::client::paired_connect("127.0.0.1", 6379)
            .await
            .unwrap();
        GLOBAL_CONNECTION.set(connection).unwrap();

        loop {
            thread::sleep(Duration::from_secs(1));
            let handle = tokio::spawn(async {
                let res: Result<String, Error> = GLOBAL_CONNECTION
                    .get()
                    .unwrap()
                    .send(resp_array!["SET", "test", "1"])
                    .await;
                if let Ok(_) = res {
                    println!("ok");
                } else {
                    println!("error");
                }
            });
            let _ = join!(handle);
        }
    }
}

One way is like this (reconnect fail):

#[cfg(test)]
mod test {
    use std::thread;
    use std::time::Duration;

    use once_cell::sync::OnceCell;
    use redis_async::client::PairedConnection;
    use redis_async::error::Error;
    use redis_async::resp_array;

    static GLOBAL_CONNECTION: OnceCell<PairedConnection> = OnceCell::new();

    #[tokio::test]
    async fn test_redis() {
        let connection = redis_async::client::paired_connect("127.0.0.1", 6379)
            .await
            .unwrap();
        GLOBAL_CONNECTION.set(connection).unwrap();

        loop {
            thread::sleep(Duration::from_secs(1));
            let res: Result<String, Error> = GLOBAL_CONNECTION
                .get()
                .unwrap()
                .send(resp_array!["SET", "test", "1"])
                .await;
            if let Ok(_) = res {
                println!("ok");
            } else {
                println!("error");
            }
        }
    }
}

I roughly understand that it's probably caused by multithreading. I'm new to Rust and I'm not sure if it's due to once_cell or redis_async. Can you provide some guidance? I'd also like to make some small contributions to redis_async.

Authentication

How do you handle authentication using this library? In most cases redis is protected by password for safety.

I would like to know how we handle authentication.

thread 'main' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /root/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/context.rs:37:26

main.rs

let main_redis = redis_async::client::paired_connect("172.0.0.1:6800".parse().unwrap()).await.unwrap();

Cargo.toml

[dependencies]
actix-web = "*"
async-trait = "*"


redis-async = "*"

sqlx = { version = "0.4.1", features = [ 'runtime-actix-rustls', "mysql" ] }
async-std = { version = "1.6", features = [ "attributes" ] }

#redis = { version = "0.19.0", features = ["async-std-tls-comp"] }


serde = { version = "*", features = ["derive"] }
serde_json = "*"

Sentinel support

Hi!
We're using Sentinel to achieve high availability of Redis databases. It looks like the most popular Redis clients for Rust don't support it. I started digging into this topic and made redis-async connect to Sentinel + reconnect when Redis instance goes down.

Sentinel connection is basically a paired connection with additional connect/reconnect logic: to find the address of Redis instance we must iterate through sentinel hosts and send SENTINEL get-master-addr-by-name master-name command with the normal Redis protocol. The address is then used like in the normal paired connection. When the connection is dropped, we discover the Redis address again repeating the same steps.

Unfortunately, it's not possible to change reconnect logic outside of the crate, because we need to access some crate-private items. Sentinel support would have to land in the library, likely as a new connection type, since not all people need it.

@benashford Are you interested in adding Sentinel support to redis-async? If so, I could share a POC. How do you see Sentinel in the API?

PSUBSCRIBE support

Hello, do you have a plan for supporting PSUBSCRIBE command? Thanks in advance for your reply.

What is our reconnect mechanism like?

Hi, I have some questions. I initialized a global connection using the once_cell crate, and I used it in two ways.

My test order is as follows:

  1. Start the Redis service.
  2. Run the test.
  3. View the console log.
  4. Stop the Redis service.
  5. View the console log.
  6. Start Redis.
  7. View the console log.

One way is like this (reconnect success):

#[cfg(test)]
mod test {
    use once_cell::sync::OnceCell;
    use redis_async::client::PairedConnection;
    use redis_async::error::Error;
    use redis_async::resp_array;
    use std::thread;
    use std::time::Duration;
    use tokio::join;

    static GLOBAL_CONNECTION: OnceCell<PairedConnection> = OnceCell::new();

    #[tokio::test]
    async fn test_redis() {
        let connection = redis_async::client::paired_connect("127.0.0.1", 6379)
            .await
            .unwrap();
        GLOBAL_CONNECTION.set(connection).unwrap();

        loop {
            thread::sleep(Duration::from_secs(1));
            let handle = tokio::spawn(async {
                let res: Result<String, Error> = GLOBAL_CONNECTION
                    .get()
                    .unwrap()
                    .send(resp_array!["SET", "test", "1"])
                    .await;
                if let Ok(_) = res {
                    println!("ok");
                } else {
                    println!("error");
                }
            });
            let _ = join!(handle);
        }
    }
}

One way is like this (reconnect fail):

#[cfg(test)]
mod test {
    use std::thread;
    use std::time::Duration;

    use once_cell::sync::OnceCell;
    use redis_async::client::PairedConnection;
    use redis_async::error::Error;
    use redis_async::resp_array;

    static GLOBAL_CONNECTION: OnceCell<PairedConnection> = OnceCell::new();

    #[tokio::test]
    async fn test_redis() {
        let connection = redis_async::client::paired_connect("127.0.0.1", 6379)
            .await
            .unwrap();
        GLOBAL_CONNECTION.set(connection).unwrap();

        loop {
            thread::sleep(Duration::from_secs(1));
            let res: Result<String, Error> = GLOBAL_CONNECTION
                .get()
                .unwrap()
                .send(resp_array!["SET", "test", "1"])
                .await;
            if let Ok(_) = res {
                println!("ok");
            } else {
                println!("error");
            }
        }
    }
}

I roughly understand that it's probably caused by multithreading. I'm new to Rust and I'm not sure if it's due to once_cell or redis_async. Can you provide some guidance? I'd also like to make some small contributions to redis_async.

Reconnect pubsub

Should pubsub reconnect? I tried, but even after a minute it didn't try to reconnect. I assume it should reconnect after 30 seconds?

Log:

DEBUG  dropping I/O source: 0

Make Tokio 1.0 the default?

Hey! What do you think about making the tokio 1.0 feature the default? Currently 0.3 is the default it seems.

pipelining requests?

I thought that the explanation about implicit pipelining requires clarification. Are you suggesting that explicit pipelining redis requests, within a single batch, is no longer needed, due to the nature of how requests are sent (in order, ...) to redis?

Update benchmarks

The README described two (crude) benchmarks against redis-rs version 0.8.

Recently redis-rs 0.9 has been released, including experimental async support, we should re-run the tests using redis-rs 0.9 in both synchronous and asynchronous modes.

Connection cannot be established when using `.wait` (but works when using `tokio::run`)

I don't know whether this is expected behavior. It definitely could be the case that this library actively relies on tokio fixtures or something. However, the failure (in my opinion) could be reported more cleanly.

As it is, if I change examples/pubsub.rs to use .wait().unwrap() instead of tokio::run(...), I get

[2019-07-28T14:03:33Z ERROR redis_async::reconnect] Connection cannot be established: Cannot spawn a pubsub connection: SpawnError { is_shutdown: true }
ERROR, cannot receive messages. Error message: Connection(ConnectionFailed)

This led me down a rather annoying rabbit-hole, unfortunately, as the issue was with the event-loop rather than with networking which is implied by ConnectionFailed.

unexpected ordering

I'm having some unexpected results with a program using redis_async. For a series of items (structs with a "key" and "data" field), it does a Redis GET and, depending on the result, possibly a SET. The processing of earlier items in this sequence ought to affect the processing of later ones, but doesn't, until the next run of the program. The code is basically a simple de-duplication algorithm that only puts things in Redis if they don't exist or are different: It attempts to GET each object in the series from Redis. If not found, or the value in Redis is different, it issues a SET for the object. For identical objects it does nothing.

Given a series of three identical objects, I'd expect the program to store the value for the first object, then do nothing for the remaining two. Instead, the code performs three SET operations, as if the initial SET (or subsequent ones) had no effect. I've confirmed that the SETs did indeed happen. I want the sequence to be (GET->SET),(GET-NOOP)->(GET->NOOP), but according to the traces I put in, the actual sequence is GET, GET, GET, SET, SET, SET. The requests are being pipelined in a way that ensures the GETs complete before any SETs do. So from the standpoint of the program, the searched for object doesn't exist in Redis. Of course, when you run the program again, the object is in Redis and it correctly identifies the objects as duplicates.

My first attempt at this created a Vec of 3 futures and then used future::join_all() to return a combined set of futures. On reflection, this didn't seem right because it doesn't say anything about the order that things should happen. It seemed like it just launched all three concurrently (with the initial GETs at the head of the queue). I also tried creating a "stream" of futures with for_each(), but that didn't change the order that things happened. Is there a way to somehow create an ordered sequence of "future chains" (meaning the (GET->SET, or possibly (GET->NOOP) such that those two things completes before the next "pair of operations" happens?

Error: Connection drops, seemingly after making a first connection.

Hi Ben,
I've attempted to use your library to make a pair of redis connections for my service. The first connection is successful, but the second one fails.
After logging the errors: I am greeted with the following:

[2019-02-19T00:53:30Z DEBUG event_worker::handler] Beginning connection to Redis.
[2019-02-19T00:53:30Z INFO  redis_async::reconnect] Connection established
[2019-02-19T00:53:30Z DEBUG event_worker::handler] Making second connection to Redis.
[2019-02-19T00:53:30Z INFO  redis_async::reconnect] Connection established
[2019-02-19T00:53:30Z ERROR redis_async::client::paired] Connection to Redis closed unexpectedly
[2019-02-19T00:53:30Z DEBUG tokio_reactor] dropping I/O source: 0
[2019-02-19T00:53:30Z WARN  event_worker::handler] There was an error while attempting to receive a redis event. Redis(Remote("ERR Protocol error: expected \'$\', got \':\'"))
[2019-02-19T00:53:30Z ERROR redis_async::reconnect] Cannot perform action: Cannot write to channel: send failed because receiver is gone
[2019-02-19T00:53:30Z WARN  event_worker::handler] There was an error while attempting to receive a redis event. Redis(EndOfStream)
[2019-02-19T00:53:30Z WARN  event_worker::handler] There was an error while attempting to receive a redis event. Redis(EndOfStream)
[2019-02-19T00:53:30Z WARN  event_worker::handler] There was an error while attempting to receive a redis event. Redis(EndOfStream)

My Code: https://gist.github.com/Texlo-Dev/011a5f34556ca50ff7fa4675b56f9446
Any help is much appreciated.

Thanks,
Texlo-Dev

v0.17.0 broken build

Version
v0.17.0

Platform
MacOS

Description

Cannot build the newest redis-async:

error[E0599]: no method named `with_retries` found for struct `TcpKeepalive` in the current scope
   --> /Users/allen/.cargo/registry/src/index.crates.io-6f17d22bba15001f/redis-async-0.17.0/src/client/connect.rs:253:14
    |
250 |           let keep_alive = socket2::TcpKeepalive::new()
    |  __________________________-
251 | |             .with_time(interval)
252 | |             .with_interval(interval)
253 | |             .with_retries(1);
    | |             -^^^^^^^^^^^^ help: there is a method with a similar name: `with_time`
    | |_____________|
    | 

I am also getting this error, had to downgrade it to v0.16.1 since my project would not build with v0.17.0

EXPIRE not working?

resp_array!["EXPIRE", my_key, 90] is failing

Error:
Error("ERR Protocol error: expected '$', got ':'")

UPDATE: the seconds must be expressed as a String or &str. 👎

How to test that `PubsubConnection` is still working?

I'm working on implementing health checks for my app. I would like my app to return "not healthy" if the redis connection is somehow broken. That is fairly straight forward with PairedConnection by sending it a PING.

However I'm not sure how to accomplish that for PubsubConnection. I could try and subscribe to a random topic and see if that succeeds but feels a bit dirty.

Advice on integration with Hyper

I'm looking for advice on how to use the async-redis client in a Hyper server. This isn't really an issue or a bug, so hopefully this is an appropriate forum.

Not sure how familiar you are with Hyper, but the idea is that a Tokio reactor::Core is handed a TCP listener that returns a stream of incoming connections . Each incoming request is passed the core's handle and a Hyper "service" object that satisfies the request. Specifically, this is done in a function named "call()", which is part of the "service" trait. It's the job of "call()" to return a future that resolves the request.

Right now, my service uses a fie;d in the request to look up data in an in-memory HashMap and base its response on that data (or lack thereof). The HashMap is what I'd like to replace with a lookup against the Redis server.

I'm not sure how to approach this. I think the basic idea is:

  1. Store a reference to the Redis connection in the service object instance when it's created. Since creating the object involves a future, I'm not sure how to work this into the constructor function for my service.

  2. Inside call(), use the connection reference to send a a request to Redis and, in the returned future, read the answer and then create and return the response future that Hyper needs.

Do you have examples of this kind of usage or advice on how to structure the code, perhaps differently than what I've described above? I tried to adapt one of the example programs to do a simple send a GET and read the response, but I had numerous troubles getting it to compile.

FromResp implementation for more integers + boolean?

Like #15, maybe more useful?

When using redis, I expect commands like LLEN should only return unsigned integers, and others like SISMEMBER to only return 0/1. I'm using u32 and bool elsewhere in my code for these things, and it would be nice to easily use these in redis-async as well.

Specifically, I think FromResp should be implemented for for u8, u16, u32, u64, i8, i16, i32, isize and bool in addition to the current implementations for i64 and usize. bool would only accept 0 and 1, and the integers would accept their range, returning an error otherwise.

If you'd be willing to add this, I'll make a PR.

EXPIRE not working?

resp_array!["EXPIRE", my_key, "90"] is failing.
Error:
Error("Unexpected value, should be encoded as a SimpleString: Some(Integer(1))")
#21

PubsubStream Drop impl Calls Unsubscribe

I've applied a bandaid in my fork to remove this drop:

impl Drop for PubsubStream {
fn drop(&mut self) {
let topic: &str = self.topic.as_ref();
self.con.unsubscribe(topic);
}
}

as it is inappropriate for pattern subscriptions and cases where I want to manage the unsubscription myself.

Additionally, (and more importantly) calling unsubscribe twice for the same channel will cause all streams to silently fail.

how is it different from https://github.com/mitsuhiko/redis-rs

I am in process of exploring these two libraries. My use case is that i need it to run on top of tokio and support future. Could you please tell how it is different from mitsuhiko/redis-rs as i can see support of future and aio in this library as well?

pubsub failed on MacOs

cargo run --example pubsub failed with error:

thread 'main' panicked at 'Cannot subscribe to topic: Internal("Subscription failed, try again later...")', src/libcore/result.rs:1165:5
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace.

RespValue Bulk String to String or Integer

I get data with lrange which this data is array. But actualy its string but getting bulkstring from redis. I want to change it to string but i did not see an example for it.

let one = redis.send(Command(resp_array!["LRANGE", "generates"].append(vec!["0".to_string(),"-1".to_string()])));

        match one.await{
            Ok(res)=>{
                match res{
                    Ok(RespValue::Array(data))=>{
                        let data: Vec<i64> = data[1].into();
                        //let sparkle_heart = std::str::from_utf8(data.into()).unwrap();
                        println!("{:?}", data);
                        Ok(HttpResponse::Ok().content_type("application/json").json("Sorgu tamamlandı"))
                    },
                    _ =>{
                        Ok(HttpResponse::Ok().content_type("application/json").json("Sorgu tamamlandı"))
                    }
                }


            },
            Err(_e)=>{
                Ok(HttpResponse::InternalServerError().json("Sunucu hatası"))
            }

PairedConnection spawn error on Tokio 0.2

I'd like to try redis-async with the latest async/await syntax. After the removal of async-await-preview feature, the only viable option seems to be adding a git dependency from Tokio's master branch (v0.2.0). The code compiles but it panics when running.

Minimal working example (compiler: rustc 1.38.0-nightly (0b680cfce 2019-07-09)):

# Cargo.toml

[package]
name = "redis-async-issue38"
version = "0.1.0"
edition = "2018"

[dependencies]
futures-preview = { version = "0.3.0-alpha.14", features = ["compat"] }
redis-async = "0.4"
tokio = { git = "https://github.com/tokio-rs/tokio.git", branch = "master" }
// main.rs

#![feature(async_await)]

use std::net::SocketAddr;

use futures::compat::Future01CompatExt;
use redis_async::client;

async fn run_client(addr: SocketAddr) {
    let _ = client::paired_connect(&addr).compat().await.unwrap();
}

#[tokio::main]
async fn main() {
    let addr = "127.0.0.1:6379".parse().unwrap();
    run_client(addr).await
}

Output:

thread 'main' panicked at 'Cannot spawn future: SpawnError { is_shutdown: true }', src/libcore/result.rs:1051:5
stack backtrace:
  11: core::result::Result<T,E>::expect
             at /rustc/0b680cfce544ff9a59d720020e397c4bf3346650/src/libcore/result.rs:879
  12: redis_async::reconnect::Reconnect<A,T,RE,CE>::reconnect
             at /Users/gyk/.cargo/registry/src/github.com-1ecc6299db9ec823/redis-async-0.4.5/src/reconnect.rs:167
  13: redis_async::reconnect::reconnect
             at /Users/gyk/.cargo/registry/src/github.com-1ecc6299db9ec823/redis-async-0.4.5/src/reconnect.rs:65
  14: redis_async::client::paired::paired_connect::{{closure}}
             at /Users/gyk/.cargo/registry/src/github.com-1ecc6299db9ec823/redis-async-0.4.5/src/client/paired.rs:190

It may be related to tokio-rs/tokio#1098.

PoisonError in reconnect.rs

Hey! I'm seeing errors like this:

panic: Cannot obtain read lock: "PoisonError { inner: .. }", file: /usr/local/cargo/registry/src/github.com-1ecc6299db9ec823/redis-async-0.6.5/src/reconnect.rs:131:45

I cannot quite figure out under which circumstances that can happen. Do you have any hints?

Note that I'm on version 0.6.5. Do you think updating would impact this issue?

Pub/Sub Connection Drops When no (P)Subscribers Remain

This code will error on the current release:

use redis_async::client::*;
use futures::StreamExt;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    const URL: &str = "127.0.0.1";
    const CHANNEL: &str = "test";

    let pubsub = pubsub_connect(URL, 6379).await?;

    for _ in 0..100 {
        println!("subscribing to channel: {}", CHANNEL);
        let stream = pubsub.subscribe(CHANNEL).await?;
    
        println!("unsubscribing to channel: {}", CHANNEL);
        // drop calls unsubscribe
        drop(stream);
    }

    println!("subscribing to channel: {}", CHANNEL);
    let mut stream = pubsub.subscribe(CHANNEL).await?;

    stream.next().await;

    Ok(())
}

Due to the PubsubConnectionInner future resolving when there are zero remaining channels:

if self.subscriptions.is_empty() {
return Ok(false);
}

Would be good to have changelog for releases

As well as pushing git tags for the published releases. Would highly recommend having that together with a CHANGELOG.md and add the changelog items to the GitHub release after one has released it and pushed the tag. So everyone can see what are the user facing changes between versions without having to manually diff the code.

One can diff the code with for example http://diff.rs/redis-async/0.14.2/0.16.0 but not the best practice as more cumbersome than providing a changelog / release notes

RespValue only implements From<usize>, no other integers

If possible, I think it would make sense to implement From for i8, i16, i32, i64, u8, u16, u32 and isize through u32 as well.

This would make using resp_array!() with other integer types much more ergonomic, so we don't have to either cast as usize or use RespValue::Integer(x.into()) manually.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.