Giter Site home page Giter Site logo

redis / rueidis Goto Github PK

View Code? Open in Web Editor NEW
2.3K 21.0 148.0 6.94 MB

A fast Golang Redis client that supports Client Side Caching, Auto Pipelining, Generics OM, RedisJSON, RedisBloom, RediSearch, etc.

License: Apache License 2.0

Go 99.87% Shell 0.13%
go golang redis redis-client resp3-client resp3 client-side-caching generics cache distributed

rueidis's Introduction

rueidis

Go Reference CircleCI Go Report Card codecov

A fast Golang Redis client that does auto pipelining and supports server-assisted client-side caching.

Features


Getting Started

package main

import (
	"context"
	"github.com/redis/rueidis"
)

func main() {
	client, err := rueidis.NewClient(rueidis.ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
	if err != nil {
		panic(err)
	}
	defer client.Close()

	ctx := context.Background()
	// SET key val NX
	err = client.Do(ctx, client.B().Set().Key("key").Value("val").Nx().Build()).Error()
	// HGETALL hm
	hm, err := client.Do(ctx, client.B().Hgetall().Key("hm").Build()).AsStrMap()
}

Check out more examples: Command Response Cheatsheet

Developer Friendly Command Builder

client.B() is the builder entry point to construct a redis command:

Developer friendly command builder
Recorded by @FZambia Improving Centrifugo Redis Engine throughput and allocation efficiency with Rueidis Go library

Once a command is built, use either client.Do() or client.DoMulti() to send it to redis.

You ❗️SHOULD NOT❗️ reuse the command to another client.Do() or client.DoMulti() call because it has been recycled to the underlying sync.Pool by default.

To reuse a command, use Pin() after Build() and it will prevent the command from being recycled.

Auto Pipelining

All concurrent non-blocking redis commands (such as GET, SET) are automatically pipelined, which reduces the overall round trips and system calls and gets higher throughput. You can easily get the benefit of pipelining technique by just calling client.Do() from multiple goroutines concurrently. For example:

func BenchmarkPipelining(b *testing.B, client rueidis.Client) {
	// the below client.Do() operations will be issued from
	// multiple goroutines and thus will be pipelined automatically.
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			client.Do(context.Background(), client.B().Get().Key("k").Build()).ToString()
		}
	})
}

Benchmark comparison with go-redis v9

Compared to go-redis, Rueidis has higher throughput across 1, 8, and 64 parallelism settings.

It is even able to achieve ~14x throughput over go-redis in a local benchmark of Macbook Pro 16" M1 Pro 2021. (see parallelism(64)-key(16)-value(64)-10)

client_test_set

Benchmark source code: https://github.com/rueian/rueidis-benchmark

A benchmark result performed on two GCP n2-highcpu-2 machines also shows that rueidis can achieve higher throughput with lower latencies: #93

Manual Pipelining

Besides auto pipelining, you can also pipeline commands manually with DoMulti():

cmds := make(rueidis.Commands, 0, 10)
for i := 0; i < 10; i++ {
    cmds = append(cmds, client.B().Set().Key("key").Value("value").Build())
}
for _, resp := range client.DoMulti(ctx, cmds...) {
    if err := resp.Error(); err != nil {
        panic(err)
    }
}

The opt-in mode of server-assisted client-side caching is enabled by default and can be used by calling DoCache() or DoMultiCache() with client-side TTLs specified.

client.DoCache(ctx, client.B().Hmget().Key("mk").Field("1", "2").Cache(), time.Minute).ToArray()
client.DoMultiCache(ctx,
    rueidis.CT(client.B().Get().Key("k1").Cache(), 1*time.Minute),
    rueidis.CT(client.B().Get().Key("k2").Cache(), 2*time.Minute))

Cached responses, including Redis Nils, will be invalidated either when being notified by redis servers or when their client-side TTLs are reached. See #534 for more details.

Benchmark

Server-assisted client-side caching can dramatically boost latencies and throughput just like having a redis replica right inside your application. For example:

client_test_get

Benchmark source code: https://github.com/rueian/rueidis-benchmark

Client-Side Caching Helpers

Use CacheTTL() to check the remaining client-side TTL in seconds:

client.DoCache(ctx, client.B().Get().Key("k1").Cache(), time.Minute).CacheTTL() == 60

Use IsCacheHit() to verify if the response came from the client-side memory:

client.DoCache(ctx, client.B().Get().Key("k1").Cache(), time.Minute).IsCacheHit() == true

If the OpenTelemetry is enabled by the rueidisotel.NewClient(option), then there are also two metrics instrumented:

  • rueidis_do_cache_miss
  • rueidis_do_cache_hits

MGET/JSON.MGET Client-Side Caching Helpers

rueidis.MGetCache and rueidis.JsonMGetCache are handy helpers fetching multiple keys across different slots through the client-side caching. They will first group keys by slot to build MGET or JSON.MGET commands respectively and then send requests with only cache missed keys to redis nodes.

Broadcast Mode Client-Side Caching

Although the default is opt-in mode, you can use broadcast mode by specifying your prefixes in ClientOption.ClientTrackingOptions:

client, err := rueidis.NewClient(rueidis.ClientOption{
	InitAddress:           []string{"127.0.0.1:6379"},
	ClientTrackingOptions: []string{"PREFIX", "prefix1:", "PREFIX", "prefix2:", "BCAST"},
})
if err != nil {
	panic(err)
}
client.DoCache(ctx, client.B().Get().Key("prefix1:1").Cache(), time.Minute).IsCacheHit() == false
client.DoCache(ctx, client.B().Get().Key("prefix1:1").Cache(), time.Minute).IsCacheHit() == true

Please make sure that commands passed to DoCache() and DoMultiCache() are covered by your prefixes. Otherwise, their client-side cache will not be invalidated by redis.

Client-Side Caching with Cache Aside Pattern

Cache-Aside is a widely used caching strategy. rueidisaside can help you cache data into your client-side cache backed by Redis. For example:

client, err := rueidisaside.NewClient(rueidisaside.ClientOption{
    ClientOption: rueidis.ClientOption{InitAddress: []string{"127.0.0.1:6379"}},
})
if err != nil {
    panic(err)
}
val, err := client.Get(context.Background(), time.Minute, "mykey", func(ctx context.Context, key string) (val string, err error) {
    if err = db.QueryRowContext(ctx, "SELECT val FROM mytab WHERE id = ?", key).Scan(&val); err == sql.ErrNoRows {
        val = "_nil_" // cache nil to avoid penetration.
        err = nil     // clear err in case of sql.ErrNoRows.
    }
    return
})
// ...

Please refer to the full example at rueidisaside.

Disable Client-Side Caching

Some Redis providers don't support client-side caching, ex. Google Cloud Memorystore. You can disable client-side caching by setting ClientOption.DisableCache to true. This will also fall back client.DoCache() and client.DoMultiCache() to client.Do() and client.DoMulti().

Context Cancellation

client.Do(), client.DoMulti(), client.DoCache(), and client.DoMultiCache() can return early if the context is canceled or the deadline is reached.

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
client.Do(ctx, client.B().Set().Key("key").Value("val").Nx().Build()).Error() == context.DeadlineExceeded

Please note that though operations can return early, the command is likely sent already.

Pub/Sub

To receive messages from channels, client.Receive() should be used. It supports SUBSCRIBE, PSUBSCRIBE, and Redis 7.0's SSUBSCRIBE:

err = client.Receive(context.Background(), client.B().Subscribe().Channel("ch1", "ch2").Build(), func(msg rueidis.PubSubMessage) {
    // Handle the message. Note that if you want to call another `client.Do()` here, you need to do it in another goroutine or the `client` will be blocked.
})

The provided handler will be called with the received message.

It is important to note that client.Receive() will keep blocking until returning a value in the following cases:

  1. return nil when receiving any unsubscribe/punsubscribe message related to the provided subscribe command.
  2. return rueidis.ErrClosing when the client is closed manually.
  3. return ctx.Err() when the ctx is done.
  4. return non-nil err when the provided subscribe command fails.

While the client.Receive() call is blocking, the Client is still able to accept other concurrent requests, and they are sharing the same TCP connection. If your message handler may take some time to complete, it is recommended to use the client.Receive() inside a client.Dedicated() for not blocking other concurrent requests.

Alternative PubSub Hooks

The client.Receive() requires users to provide a subscription command in advance. There is an alternative Dedicatedclient.SetPubSubHooks() that allows users to subscribe/unsubscribe channels later.

c, cancel := client.Dedicate()
defer cancel()

wait := c.SetPubSubHooks(rueidis.PubSubHooks{
	OnMessage: func(m rueidis.PubSubMessage) {
		// Handle the message. Note that if you want to call another `c.Do()` here, you need to do it in another goroutine or the `c` will be blocked.
	}
})
c.Do(ctx, c.B().Subscribe().Channel("ch").Build())
err := <-wait // disconnected with err

If the hooks are not nil, the above wait channel is guaranteed to be closed when the hooks will not be called anymore, and produce at most one error describing the reason. Users can use this channel to detect disconnection.

CAS Transaction

To do a CAS Transaction (WATCH + MULTI + EXEC), a dedicated connection should be used because there should be no unintentional write commands between WATCH and EXEC. Otherwise, the EXEC may not fail as expected.

client.Dedicated(func(c rueidis.DedicatedClient) error {
    // watch keys first
    c.Do(ctx, c.B().Watch().Key("k1", "k2").Build())
    // perform read here
    c.Do(ctx, c.B().Mget().Key("k1", "k2").Build())
    // perform write with MULTI EXEC
    c.DoMulti(
        ctx,
        c.B().Multi().Build(),
        c.B().Set().Key("k1").Value("1").Build(),
        c.B().Set().Key("k2").Value("2").Build(),
        c.B().Exec().Build(),
    )
    return nil
})

Or use Dedicate() and invoke cancel() when finished to put the connection back to the pool.

c, cancel := client.Dedicate()
defer cancel()

c.Do(ctx, c.B().Watch().Key("k1", "k2").Build())
// do the rest CAS operations with the `client` who occupies a connection

However, occupying a connection is not good in terms of throughput. It is better to use Lua script to perform optimistic locking instead.

Lua Script

The NewLuaScript or NewLuaScriptReadOnly will create a script which is safe for concurrent usage.

When calling the script.Exec, it will try sending EVALSHA first and fall back to EVAL if the server returns NOSCRIPT.

script := rueidis.NewLuaScript("return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}")
// the script.Exec is safe for concurrent call
list, err := script.Exec(ctx, client, []string{"k1", "k2"}, []string{"a1", "a2"}).ToArray()

Streaming Read

client.DoStream() and client.DoMultiStream() can be used to send large redis responses to an io.Writer directly without allocating them to the memory. They work by first sending commands to a dedicated connection acquired from a pool, then directly copying the response values to the given io.Writer, and finally recycling the connection.

s := client.DoMultiStream(ctx, client.B().Get().Key("a{slot1}").Build(), client.B().Get().Key("b{slot1}").Build())
for s.HasNext() {
    n, err := s.WriteTo(io.Discard)
    if rueidis.IsRedisNil(err) {
        // ...
    }
}

Note that these two methods will occupy connections until all responses are written to the given io.Writer. This can take a long time and hurt performance. Use the normal Do() and DoMulti() instead unless you want to avoid allocating memory for a large redis response.

Also note that these two methods only work with string, integer, and float redis responses. And DoMultiStream currently does not support pipelining keys across multiple slots when connecting to a redis cluster.

Memory Consumption Consideration

Each underlying connection in rueidis allocates a ring buffer for pipelining. Its size is controlled by the ClientOption.RingScaleEachConn and the default value is 10 which results into each ring of size 2^10.

If you have many rueidis connections, you may find that they occupy quite an amount of memory. In that case, you may consider reducing ClientOption.RingScaleEachConn to 8 or 9 at the cost of potential throughput degradation.

You may also consider setting the value of ClientOption.PipelineMultiplex to -1, which will let rueidis use only 1 connection for pipelining to each redis node.

Instantiating a new Redis Client

You can create a new redis client using NewClient and provide several options.

// Connect to a single redis node:
client, err := rueidis.NewClient(rueidis.ClientOption{
    InitAddress: []string{"127.0.0.1:6379"},
})

// Connect to a redis cluster
client, err := rueidis.NewClient(rueidis.ClientOption{
    InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"},
    ShuffleInit: true,
})

// Connect to a redis cluster and use replicas for read operations
client, err := rueidis.NewClient(rueidis.ClientOption{
    InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"},
    SendToReplicas: func(cmd rueidis.Completed) bool {
        return cmd.IsReadOnly()
    },
})

// Connect to sentinels
client, err := rueidis.NewClient(rueidis.ClientOption{
    InitAddress: []string{"127.0.0.1:26379", "127.0.0.1:26380", "127.0.0.1:26381"},
    Sentinel: rueidis.SentinelOption{
        MasterSet: "my_master",
    },
})

Redis URL

You can use ParseURL or MustParseURL to construct a ClientOption.

The provided URL must be started with either redis://, rediss:// or unix://.

Currently supported url parameters are db, dial_timeout, write_timeout, addr, protocol, client_cache, client_name, max_retries, and master_set.

// connect to a redis cluster
client, err = rueidis.NewClient(rueidis.MustParseURL("redis://127.0.0.1:7001?addr=127.0.0.1:7002&addr=127.0.0.1:7003"))
// connect to a redis node
client, err = rueidis.NewClient(rueidis.MustParseURL("redis://127.0.0.1:6379/0"))
// connect to a redis sentinel
client, err = rueidis.NewClient(rueidis.MustParseURL("redis://127.0.0.1:26379/0?master_set=my_master"))

Arbitrary Command

If you want to construct commands that are absent from the command builder, you can use client.B().Arbitrary():

// This will result in [ANY CMD k1 k2 a1 a2]
client.B().Arbitrary("ANY", "CMD").Keys("k1", "k2").Args("a1", "a2").Build()

Working with JSON, Raw []byte, and Vector Similarity Search

The command builder treats all the parameters as Redis strings, which are binary safe. This means that users can store []byte directly into Redis without conversion. And the rueidis.BinaryString helper can convert []byte to string without copying. For example:

client.B().Set().Key("b").Value(rueidis.BinaryString([]byte{...})).Build()

Treating all the parameters as Redis strings also means that the command builder doesn't do any quoting, conversion automatically for users.

When working with RedisJSON, users frequently need to prepare JSON strings in Redis strings. And rueidis.JSON can help:

client.B().JsonSet().Key("j").Path("$.myStrField").Value(rueidis.JSON("str")).Build()
// equivalent to
client.B().JsonSet().Key("j").Path("$.myStrField").Value(`"str"`).Build()

When working with vector similarity search, users can use rueidis.VectorString32 and rueidis.VectorString64 to build queries:

cmd := client.B().FtSearch().Index("idx").Query("*=>[KNN 5 @vec $V]").
    Params().Nargs(2).NameValue().NameValue("V", rueidis.VectorString64([]float64{...})).
    Dialect(2).Build()
n, resp, err := client.Do(ctx, cmd).AsFtSearch()

Command Response Cheatsheet

While the command builder is developer-friendly, the response parser is a little unfriendly. Developers must know what type of Redis response will be returned from the server beforehand and which parser they should use.

Error Handling: If an incorrect parser function is chosen, an errParse will be returned. Here's an example using ToArray which demonstrates this scenario:

// Attempt to parse the response. If a parsing error occurs, check if the error is a parse error and handle it.
// Normally, you should fix the code by choosing the correct parser function.
// For instance, use ToString() if the expected response is a string, or ToArray() if the expected response is an array as follows:
if err := client.Do(ctx, client.B().Get().Key("k").Build()).ToArray(); IsParseErr(err) {
    fmt.Println("Parsing error:", err)
}

It is hard to remember what type of message will be returned and which parsing to use. So, here are some common examples:

// GET
client.Do(ctx, client.B().Get().Key("k").Build()).ToString()
client.Do(ctx, client.B().Get().Key("k").Build()).AsInt64()
// MGET
client.Do(ctx, client.B().Mget().Key("k1", "k2").Build()).ToArray()
// SET
client.Do(ctx, client.B().Set().Key("k").Value("v").Build()).Error()
// INCR
client.Do(ctx, client.B().Incr().Key("k").Build()).AsInt64()
// HGET
client.Do(ctx, client.B().Hget().Key("k").Field("f").Build()).ToString()
// HMGET
client.Do(ctx, client.B().Hmget().Key("h").Field("a", "b").Build()).ToArray()
// HGETALL
client.Do(ctx, client.B().Hgetall().Key("h").Build()).AsStrMap()
// EXPIRE
client.Do(ctx, client.B().Expire().Key("k").Seconds(1).Build()).AsInt64()
// HEXPIRE
client.Do(ctx, client.B().Hexpire().Key("h").Seconds(1).Fields().Numfields(2).Field("f1", "f2").Build()).AsIntSlice()
// ZRANGE
client.Do(ctx, client.B().Zrange().Key("k").Min("1").Max("2").Build()).AsStrSlice()
// ZRANK
client.Do(ctx, client.B().Zrank().Key("k").Member("m").Build()).AsInt64()
// ZSCORE
client.Do(ctx, client.B().Zscore().Key("k").Member("m").Build()).AsFloat64()
// ZRANGE
client.Do(ctx, client.B().Zrange().Key("k").Min("0").Max("-1").Build()).AsStrSlice()
client.Do(ctx, client.B().Zrange().Key("k").Min("0").Max("-1").Withscores().Build()).AsZScores()
// ZPOPMIN
client.Do(ctx, client.B().Zpopmin().Key("k").Build()).AsZScore()
client.Do(ctx, client.B().Zpopmin().Key("myzset").Count(2).Build()).AsZScores()
// SCARD
client.Do(ctx, client.B().Scard().Key("k").Build()).AsInt64()
// SMEMBERS
client.Do(ctx, client.B().Smembers().Key("k").Build()).AsStrSlice()
// LINDEX
client.Do(ctx, client.B().Lindex().Key("k").Index(0).Build()).ToString()
// LPOP
client.Do(ctx, client.B().Lpop().Key("k").Build()).ToString()
client.Do(ctx, client.B().Lpop().Key("k").Count(2).Build()).AsStrSlice()
// SCAN
client.Do(ctx, client.B().Scan().Cursor(0).Build()).AsScanEntry()
// FT.SEARCH
client.Do(ctx, client.B().FtSearch().Index("idx").Query("@f:v").Build()).AsFtSearch()
// GEOSEARCH
client.Do(ctx, client.B().Geosearch().Key("k").Fromlonlat(1, 1).Bybox(1).Height(1).Km().Build()).AsGeosearch()

Use DecodeSliceOfJSON to scan array result

DecodeSliceOfJSON is useful when you would like to scan the results of an array into a slice of a specific struct.

type User struct {
	Name string `json:"name"`
}

// Set some values
if err = client.Do(ctx, client.B().Set().Key("user1").Value(`{"name": "name1"}`).Build()).Error(); err != nil {
	return err
}
if err = client.Do(ctx, client.B().Set().Key("user2").Value(`{"name": "name2"}`).Build()).Error(); err != nil {
	return err
}

// Scan MGET results into []*User
var users []*User // or []User is also scannable
if err := rueidis.DecodeSliceOfJSON(client.Do(ctx, client.B().Mget().Key("user1", "user2").Build()), &users); err != nil {
	return err
}

for _, user := range users {
	fmt.Printf("%+v\n", user)
}
/*
&{name:name1}
&{name:name2}
*/

!!!!!! DO NOT DO THIS !!!!!!

Please make sure that all values in the result have the same JSON structures.

// Set a pure string value
if err = client.Do(ctx, client.B().Set().Key("user1").Value("userName1").Build()).Error(); err != nil {
	return err
}

// Bad
users := make([]*User, 0)
if err := rueidis.DecodeSliceOfJSON(client.Do(ctx, client.B().Mget().Key("user1").Build()), &users); err != nil {
	return err
}
// -> Error: invalid character 'u' looking for the beginning of the value
// in this case, use client.Do(ctx, client.B().Mget().Key("user1").Build()).AsStrSlice()

Contributing

Contributions are welcome, including issues, pull requests, and discussions. Contributions mean a lot to us and help us improve this library and the community!

Generate command builders

Command builders are generated based on the definitions in ./hack/cmds by running:

go generate

Testing

Please use the ./dockertest.sh script for running test cases locally. And please try your best to have 100% test coverage on code changes.

rueidis's People

Contributors

418coffee avatar ali-assar avatar ash2k avatar chayim avatar chkp-omris avatar cyuankuo avatar dependabot[bot] avatar destroyeralpha avatar erdemtuna avatar exca-dk avatar exzrgs avatar fabienjuif avatar fzambia avatar gross2001 avatar hoganedwardchu avatar j178 avatar jinwoo1225 avatar kevinxmorales avatar moonorange avatar nirrattner avatar numberinho avatar proost avatar rueian avatar sgasho avatar shushenghong avatar smf8 avatar soulpancake avatar trim21 avatar unknowntpo avatar yxxhero avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rueidis's Issues

new feat for rueidis

expect to return normally when there is an error occurs(no redis error,example: eof) in the query redis, no keep retry.

Calls hang when redis stops

  • Spin up a server
  • Start making non-blocking requests to that server
  • Kill the server

At this point, the calls start to hang with variable timeouts, without respecting context deadlines. The issue seems to be inside this function: https://github.com/rueian/rueidis/blob/master/mux.go#L85

The error returned is a ECONNREFUSED, the wire is nil, and the function just loops over the retry block.

I'm not proficient in Go, but AFAIU there is no context check inside that pipe function, so the behaviour is unpredictable.

Is there a way to catch this sooner and obtaining some sort of fail fast?

Connection degradation during PUB/SUB

Hello @rueian , I came across one more thing - spent some time trying to figure out - but failed.

I prepared a program that demonstrates a connection degradation while performing massive PUB/SUB:

Go program
package main

import (
  "context"
  "flag"
  "log"
  "runtime"
  "strconv"
  "sync/atomic"
  "time"

  "github.com/gomodule/redigo/redis"
  "github.com/rueian/rueidis"
)

var useRedigo = flag.Bool("redigo", false, "use redigo")
var numNodes = flag.Int("nodes", 200, "num nodes")

var (
  read        int64
  write       int64
  concurrency = runtime.NumCPU()
  data        = string(make([]byte, 512))
)

func main() {
  flag.Parse()

  go func() {
  	var valRead int64
  	var valWrite int64
  	for {
  		time.Sleep(time.Second)
  		newValRead := atomic.LoadInt64(&read)
  		newValWrite := atomic.LoadInt64(&write)
  		log.Println(newValRead-valRead, "msg/sec read", newValWrite-valWrite, "msg/sec write")
  		valWrite = newValWrite
  		valRead = newValRead
  	}
  }()

  if *useRedigo {
  	runRedigo()
  } else {
  	run()
  }
}

func run() {
  client, err := rueidis.NewClient(rueidis.ClientOption{
  	InitAddress: []string{"127.0.0.1:6379"},
  })
  if err != nil {
  	panic(err)
  }
  defer client.Close()

  messages := make(chan rueidis.PubSubMessage, 1024)
  var subscribeConns []rueidis.DedicatedClient

  for i := 0; i < *numNodes; i++ {
  	conn, _ := client.Dedicate()

  	conn.SetPubSubHooks(rueidis.PubSubHooks{
  		OnMessage: func(m rueidis.PubSubMessage) {
  			atomic.AddInt64(&read, 1)
  			messages <- m
  		},
  	})

  	resp := conn.Do(context.Background(), client.B().Subscribe().Channel("request", "response"+strconv.Itoa(i)).Build())
  	if resp.Error() != nil {
  		panic(resp.Error())
  	}
  	subscribeConns = append(subscribeConns, conn)
  }

  defer func() {
  	for _, conn := range subscribeConns {
  		conn.Close()
  	}
  }()

  for i := 0; i < concurrency; i++ {
  	go func() {
  		for {
  			client.Do(context.Background(), client.B().Publish().Channel("request").Message(data).Build())
  			atomic.AddInt64(&write, 1)
  			for i := 0; i < *numNodes; i++ {
  				<-messages
  				client.Do(context.Background(), client.B().Publish().Channel("response"+strconv.Itoa(i)).Message(data).Build())
  				atomic.AddInt64(&write, 1)
  			}
  			for i := 0; i < *numNodes; i++ {
  				<-messages
  			}
  		}
  	}()
  }
  select {}
}

func runRedigo() {
  messages := make(chan redis.Message, 1024)
  var subscribeConns []redis.Conn

  for i := 0; i < *numNodes; i++ {
  	c, err := redis.Dial("tcp", "127.0.0.1:6379",
  		redis.DialReadTimeout(60*time.Second),
  		redis.DialWriteTimeout(10*time.Second),
  	)
  	if err != nil {
  		panic(err)
  	}

  	psc := redis.PubSubConn{Conn: c}

  	// Start a goroutine to receive notifications from the server.
  	go func() {
  		for {
  			switch n := psc.Receive().(type) {
  			case error:
  				panic(n)
  			case redis.Message:
  				atomic.AddInt64(&read, 1)
  				messages <- n
  			default:
  			}
  		}
  	}()

  	if err := psc.Subscribe("request", "response"+strconv.Itoa(i)); err != nil {
  		panic(err)
  	}

  	subscribeConns = append(subscribeConns, c)
  }

  defer func() {
  	for _, conn := range subscribeConns {
  		_ = conn.Close()
  	}
  }()

  for i := 0; i < concurrency; i++ {
  	go func() {
  		pubConn, err := redis.Dial("tcp", "127.0.0.1:6379",
  			redis.DialReadTimeout(10*time.Second),
  			redis.DialWriteTimeout(10*time.Second))
  		if err != nil {
  			panic(err)
  		}
  		defer func() { _ = pubConn.Close() }()
  		for {
  			_, err := pubConn.Do("PUBLISH", "request", data)
  			if err != nil {
  				panic(err)
  			}
  			atomic.AddInt64(&write, 1)
  			for i := 0; i < *numNodes; i++ {
  				<-messages
  				_, _ = pubConn.Do("PUBLISH", "response"+strconv.Itoa(i), data)
  				atomic.AddInt64(&write, 1)
  			}
  			for i := 0; i < *numNodes; i++ {
  				<-messages
  			}
  		}
  	}()
  }
  select {}
}

This program implements sort of request-reply pattern on top of PUB/SUB (actually a bit artificial, but reproduces an issue for me).

The problem is that with rueidis connection degrades very quickly - Go process starts consuming all the CPU on machine, CPU profile shows 96% of runtime.gosched_m. Redis is mostly not loaded at this time - maybe 2-3% of CPU usage. The same program with Redigo does not have such a problem.

So for me the output for rueidis (go run main.go) case is sth like:

2022/06/15 22:13:16 179939 msg/sec read 90400 msg/sec write
2022/06/15 22:13:17 33935 msg/sec read 17191 msg/sec write
2022/06/15 22:13:18 14225 msg/sec read 7059 msg/sec write
2022/06/15 22:13:19 2098 msg/sec read 1092 msg/sec write
2022/06/15 22:13:20 2430 msg/sec read 1068 msg/sec write
2022/06/15 22:13:21 1569 msg/sec read 1140 msg/sec write
2022/06/15 22:13:22 2920 msg/sec read 960 msg/sec write
...

And for Redigo (go run main.go -redigo) PUB/SUB is stable all the time:

2022/06/15 22:13:49 125744 msg/sec read 62837 msg/sec write
2022/06/15 22:13:50 119642 msg/sec read 60334 msg/sec write
2022/06/15 22:13:51 122295 msg/sec read 61811 msg/sec write
2022/06/15 22:13:52 127341 msg/sec read 63667 msg/sec write
2022/06/15 22:13:53 124588 msg/sec read 62681 msg/sec write
2022/06/15 22:13:54 123669 msg/sec read 62178 msg/sec write
2022/06/15 22:13:55 123225 msg/sec read 61936 msg/sec write
...

I am on Darwin (MacOS), Redis 6.2.1, go version go1.18.3 darwin/amd64

Not 100% sure you will be able to reproduce an issue on your machine - maybe increase concurrency or nodes.

A generic method for executing Redis commands

Similar to Redigo: n, err := conn.Do("APPEND", "key", "value") something like ... c.Do(ctx, c.B().Command("APPEND", "key", "value").Build())

This would enable users to use new/existing modules not currently supported #10 [RedisGears Support]

Client side caching with MGet

Hello! Thank you for your work on this awesome library. Pardon me for a stupid question because I'm kind of new to Redis. I have noticed that MGet() doesn't support client-side caching. Are there any particular reasons for that? If it's just a matter of implementation then would you be open to contributions for implementing client-side caching for that command?

We want to use MGet() in particular to avoid many hops between the client and the server. For example, requesting >10000 keys would be painful with a regular Get().

Why does readMap return an array?

first of all, I'm extremely new to golang, so sorry in advance if what I'm saying doesn't make much sense

func readMap(i *bufio.Reader) (m RedisMessage, err error) {
	length, err := readI(i)
	if err == errChunked {
		m.values, err = readE(i)
	} else {
		m.values, err = readA(i, int(length*2))
	}
	if err != nil {
		return RedisMessage{}, err
	}
	return
}

IIUC readMap will assign m.values, which is an array.
shouldn't RedisMessage have a map member?

Not exit when the ctx cancel for stream read


ctx1, cancel := context.WithCancel(ctx)
go func() {
	time.Sleep(3 * time.Second)
	cancel()
}()

xReadGroup = c.XReadGroup(ctx1, XReadGroupArgs{
	Group:    "group",
	Consumer: "consumer",
	Streams:  []string{key, ">"},
	Count:    1,
	Block:    0,
})

DoMulti panic when used with no commands

Hi @rueian, a small note.

rdb.DoMulti(ctx)
panic: runtime error: index out of range [0] with length 0

goroutine 1 [running]:
github.com/rueian/rueidis.(*pipe).DoMulti(0xc0000e8420, {0x7cf188, 0xc000092780}, {0xa24cc0?, 0x0, 0x0})
        /home/mzhiburt/go/pkg/mod/github.com/rueian/[email protected]/pipe.go:600 +0x934
github.com/rueian/rueidis.(*mux).pipelineMulti(0xc000092720, {0x7cf188, 0xc000092780}, {0xa24cc0, 0x0, 0x0})
        /home/mzhiburt/go/pkg/mod/github.com/rueian/[email protected]/mux.go:189 +0x9e
github.com/rueian/rueidis.(*mux).DoMulti(0x0?, {0x7cf188?, 0xc000092780?}, {0xa24cc0?, 0xc000094550?, 0x9e2b40?})
        /home/mzhiburt/go/pkg/mod/github.com/rueian/[email protected]/mux.go:151 +0x35
github.com/rueian/rueidis.(*singleClient).DoMulti(0xc0000b06f0, {0x7cf188, 0xc000092780}, {0xa24cc0?, 0x0, 0x0})
        /home/mzhiburt/go/pkg/mod/github.com/rueian/[email protected]/client.go:50 +0x9a

How to create a key using JSONSet when it does not exist

I need to create a key using JSONSet when it does not exist

err :=repo.db.Do(ctx, repo.db.B().JsonSet().Key(key).Path("$."+searchOption.Province).Value(string(jsons)).Build()).Error()

But failed, err message as follow
“ERR new objects must be created at the root”

How to create a key using JSONSet when it does not exist?

Flexible dynamic PUB/SUB API

Hello again @rueian! I started experimenting with rueidis and here is what I found:

  • I am trying to replace redigo-based implementation with rueidis. Two main reasons: performance and the fact that with redigo I have to use 3 libraries to work with standalone Redis, Sentinel Redis, Redis Cluster.
  • For sync commands migration seems straightforward and I am observing 2x reduction in allocations which is super cool. I only looking at single-instance benchmarks at the moment, hopefully at some point I'll compare cluster case too
  • But I can not fully replace PUB/SUB layer unfortunately and this issue is about it

The reason is that PUB/SUB in rueidis does not provide API suitable for my use case.

In the system I have there is a PUB/SUB layer. It's responsible for dynamically subscribing/unsubscribing to channels. Channels are unknown on the PUB/SUB start – they can appear/disappear during application lifetime. On connection drop system reconnects to previously subscribed channels in Redis. In this case I can not afford re-subscribing from scratch on every channel added or removed. The number of channels is supposed to be very large.

This is pretty valid patern for Redis: i.e. sending subscribe/unsubscribe commands over PUB/SUB connection.

With redigo it's possible to implement:

psc := redis.PubSubConn{Conn: c}
psc.Subscribe("example")
for {
    switch v := psc.Receive().(type) {
    case redis.Message:
        fmt.Printf("%s: message: %s\n", v.Channel, v.Data)
    case redis.Subscription:
        fmt.Printf("%s: %s %d\n", v.Channel, v.Kind, v.Count)
    case error:
        return v
    }
}

I.e. receiving messages is decoupled from Subscribe and Unsubscribe ops. So it's possible to control subscriptions in a desired way.

With rueidis it's only possible to pass initial channels and receive updates from them. It's still possible to use dedicated connection and Subscribe it on additional channels in a separate goroutine for example – but when push received from Redis it's dropped on rueidis level since the channel is not in the initial list passed to initial Subscribe command. So application can't process that message.

Also, rueidis has some logic that Receive() exits when (from readme):

when received any unsubscribe/punsubscribe message related to the provided subscribe command.

This is not actually desired in my case, I want PUB/SUB Receive() to work until I close it explicitly by closing dedicated connection maybe (but it does not have Close method btw). Or maybe until Context passed to Receive(ctx) is done - I suppose it should already work. Just do not stop receiving when unsubscribe from some channel received - it's a normal situation in my case.

If I have not missed sth obvious then what I am proposing here is to add some raw access to PUB/SUB API:

  1. Do not have any additional logic in ruedis to manage PUB/SUB subscriptions (like it currently have keeping track of initial channels) – let application do this. I.e. pass everything from Redis to application's message handler. Possibly have a way to create PubSubConnection similar to Dedicated but which has Receive(ctx) without subscribe command passed to it.
  2. Do not exit on receiving unsubscribe on previously subscribed channel - this is normal situation. If implementing (1) I think this won't be the case anymore.
  3. Maybe add method to close Dedicated (or ) connection which will result into return from blocked Receive() - at least I was looking for one. In general I can control exit from Receive using Context's cancel() I suppose. But closing Pub/Sub connection which observed an error seems nice to have – I don't want to proceed with this connection actually. I think cancelling context is sth that still leaves broken connection alive (not sure though how it currently works).

Hope this makes sense!

How to get redis time series madd error when tsdb key not exist?

cmd := client.B().TsMadd().KeyTimestampValue().KeyTimestampValue(key, timestamp, value)
res := client.Do(context.Background(), cmd.Build())
fmt.Println(res.Error(), res.NonRedisError(), res.RedisError())

when add datapoint to non exist time series, the error print all nil, but data was not append to redis.
We can get the error info from res.ToAny(), but is inconvenient to judge.

v0.0.37 ,in the use case would happen panic situation

        // client is rueidis client.
	for {
		time.Sleep(1 * time.Second)

		ctx := context.Background()
		resp := client.DoCache(ctx, client.B().Get().Key("abc").Cache(), time.Minute*time.Duration(1))
		if !rueidis.IsRedisNil(resp.Error()) {
			log.Println(resp.ToString())
			continue
		}

		fmt.Printf("Set")
		setErr := client.Do(ctx, client.B().Set().Key("abc").Value("abccc").ExSeconds(int64(3)).Build()).Error()
		if setErr != nil {
			log.Println(setErr)
		}
	}

wil get panic: protocol bug, message handled out of order

image

Panic during reconnect to mulfunctional Redis Cluster

Got the panic like this after reconnect to a Redis Cluster:

panic: protocol bug, message handled out of order

goroutine 1148 [running]:
github.com/rueian/rueidis.(*pipe)._backgroundRead(0xc0014ae210)
  /Users/fz/go/pkg/mod/github.com/rueian/[email protected]/pipe.go:301 +0x725
github.com/rueian/rueidis.(*pipe)._background(0xc0014ae210)
  /Users/fz/go/pkg/mod/github.com/rueian/[email protected]/pipe.go:135 +0x185
created by github.com/rueian/rueidis.(*pipe).background.func1
  /Users/fz/go/pkg/mod/github.com/rueian/[email protected]/pipe.go:113 +0x5a
exit status 2

In my case I had a working Redis cluster, then stopped the cluster (I am using docker-compose from https://github.com/Grokzen/docker-redis-cluster), ran it again. Newly created cluster was not properly functional due to cluster setup error, so every operation on every node at this point returned:

127.0.0.1:7005> set x 1
(error) CLUSTERDOWN Hash slot not served

I'll try to provide a minimal reproducer example soon. This may be a bit tricky to reproduce though.

Can't get value use 'SELECT' command

Hi @rueian,I use some commands,but is not work ok.

var key = "key"
move := c.Move(ctx, key, 2)
So(move.Err(), ShouldBeNil)
So(move.Val(), ShouldBeFalse)

s := c.Set(ctx, key, "hello", 0)
So(s.Err(), ShouldBeNil)
So(s.Val(), ShouldEqual, OK)

move = c.Move(ctx, key, 2)
So(move.Err(), ShouldBeNil)
So(move.Val(), ShouldBeTrue)

g := c.Get(ctx, key)
So(g.Err(), ShouldNotBeNil)
So(IsNil(g.Err()), ShouldBeTrue)
So(g.Val(), ShouldBeEmpty)

s = c.Select(ctx, 2)
So(s.Err(), ShouldBeNil)
So(s.Val(), ShouldEqual, OK)

g = c.Get(ctx, key)
So(g.Err(), ShouldBeNil)
So(g.Val(), ShouldEqual, "hello")

select 2 and get key, But got redis.Nil error.

version is v0.0.76, single Redis server.

RedisLabs Enterprise Support

When using the RedisLabs Enterprise Docker image, I'm not able to make a connection due to the HELLO command not being supported in the enterprise edition ...

$ /opt/redislabs/bin/redis-cli -p 12000 127.0.0.1:12000> HELLO (error) ERR unknown command 'HELLO'

I don't see this called out on the compatibility list but confirmed with RedisLabs.

Connection close not detected and deadlock on acquire

Hello, me again with a couple of new issues.

New program which demonstrates a couple of issues with rueidis.

package main

import (
	"flag"
	"log"
	"time"

	"github.com/gomodule/redigo/redis"
	"github.com/rueian/rueidis"
)

var useRedigo = flag.Bool("redigo", false, "use redigo")

func main() {
	flag.Parse()
	if *useRedigo {
		runRedigo()
	} else {
		run()
	}
}

func run() {
	client, err := rueidis.NewClient(rueidis.ClientOption{
		InitAddress: []string{"127.0.0.1:6379"},
	})
	if err != nil {
		panic(err)
	}
	defer client.Close()

	for {
		log.Println("start pub/sub")
		conn, _ := client.Dedicate()
		wait := conn.SetPubSubHooks(rueidis.PubSubHooks{
			OnMessage: func(m rueidis.PubSubMessage) {

			},
		})
		log.Println("pub/sub is ready")
		err := <-wait
		log.Println("pub/sub error", err)
		conn.Close()
		time.Sleep(1 * time.Millisecond)
	}
}

func runRedigo() {
	pool := &redis.Pool{
		Dial: func() (redis.Conn, error) {
			return redis.Dial("tcp", "127.0.0.1:6379",
				redis.DialReadTimeout(60*time.Second),
				redis.DialWriteTimeout(10*time.Second),
			)
		},
	}

LOOP:
	for {
		log.Println("start pub/sub")
		conn := pool.Get()
		psc := redis.PubSubConn{Conn: conn}

		log.Println("pub/sub is ready")

		for {
			switch n := psc.Receive().(type) {
			case error:
				log.Println("pub/sub error", n)
				_ = conn.Close()
				time.Sleep(1 * time.Millisecond)
				continue LOOP
			default:
			}
		}
	}
}

Problem 1

  1. Run Redis
  2. Run program above
  3. Stop Redis

Redigo-based program detects connection close and goes to reconnect loop, Rueidis-based program does not.

Looks like Rueidis detects connection close only upon the next write operation.

Problem 2

I added a periodic write to help Rueidis finding a broken connection after stopping Redis. It goes to reconnect loop – but at some point (very quickly and I can reliably reproduce this every time) it deadlocks on connection acquire (inside client.Dedicate() call) – so reconnecting stops. Redigo-based program does not have this issue.

From the goroutine dump of deadlocked-program:

goroutine 1 [sync.Cond.Wait]:
sync.runtime_notifyListWait(0xc000074110, 0x0)
	/usr/local/Cellar/go/1.18.3/libexec/src/runtime/sema.go:513 +0x13d
sync.(*Cond).Wait(0xc00007e080?)
	/usr/local/Cellar/go/1.18.3/libexec/src/sync/cond.go:56 +0x8c
github.com/rueian/rueidis.(*pool).Acquire(0xc00007e230)
	/Users/fz/go/pkg/mod/github.com/rueian/[email protected]/pool.go:31 +0x45
github.com/rueian/rueidis.(*mux).Acquire(0x10b8650?)
	/Users/fz/go/pkg/mod/github.com/rueian/[email protected]/mux.go:208 +0x1d
github.com/rueian/rueidis.(*singleClient).Dedicate(0xc00000e1b0)
	/Users/fz/go/pkg/mod/github.com/rueian/[email protected]/client.go:73 +0x2d
main.run()
	/Users/fz/projects/centrifugal/centrifugo-pro/internal/pro/redengine/exp/main.go:35 +0x1fb
main.main()
	/Users/fz/projects/centrifugal/centrifugo-pro/internal/pro/redengine/exp/main.go:20 +0x72

Dynamic cmd building

Hey, @rueian !

Is it possible to do dynamic cmd building depending on code logic?

Here's an example of what I'm trying to do:

// q is struct with query fields

cmd := client.B().FtAggregate().
    Index("books").Query("@genre:{" + q.Genre+ "}").
    Groupby(1).Property("@title")

// now I need to do conditional logic
// and continue building the cmd

if q.SortAsc {
    cmd = cmd.Sortby(2).Property(q.SortProperty).Asc() // but this won't work*
} else {
    cmd = cmd.Sortby(2).Property(q.SortProperty).Desc()
}
  • the reason it doesn't work is because cmd is of type cmds.FtAggregateOpGroupbyProperty so I cannot change it anymore.

Also, since the cmds package is internal, I cannot initialize a variable of the final "chain" type I want to then assign the first cmd with chained methods to that final variable. For example:

var finalCmdDesc cmds.FtAggregateOpSortbyFieldsOrderDesc
var finalCmdAsc cmds.FtAggregateOpSortbyFieldsOrderAsc

// ...

if q.SortAsc {
    finalCmdAsc = cmd.Sortby(2).Property(q.SortProperty).Asc()
} else {
    finalCmdDesc = cmd.Sortby(2).Property(q.SortProperty).Desc()
}

Even with that, I would need additional bool flags to know which one to use in the end. 😅
Or I could use generics... 🤔

Make `RedisResult`.`AsMap`/`ToMap` return `map[string]*RedisMessage`

Hi,

I am not sure if it's critical to return a pointer or not.
But I am sure that it could work better with methods on *RedisMessage.

// We would able to do this
values, _ := resp.AsMap()
m, _ := values["Something"].ToString()

// Currently we can't because we can't take an address of a map value.
// So we do the following
val := values["Something"]
from, _ := val.ToString()

Thanks,
Take care.

How do I decode the results returned by ft. Search and get the totals

I used the command in the screenshot below, set the limit to 0, according to the rules of redis FT.Search command will return the total number of queries that meet the conditions, I need to get the total number from the returned result, how should I decode the result and get the total number?

image

How to make the response time of FT.Search as short as possible with 500 user requests per second?

The service scenarios are as follows:

  1. Each user request will result in 5-10 different ft.search queries
  2. After querying the RedisJSON that meets the conditions, the JSON data need to be modified
  3. A maximum of 500 users may request the system at the same time per second

Go-redis(for exapmle:res, err := r.Do("FT.SEARCH", "index", "@status:{0913|4912}", "LIMIT", 0, 1, "RETURN", 1, "$.Number").Result())is currently used to implement the above steps 1-2. Each user request opens a separate redis connection and the connection is disconnected at the end of steps 1-2.

Due to the excellent performance of RediSearch+RedisJSON, the average response time can be less than 200ms when there are only 5 to 10 user requests per second, but when there are more than 30~50 user requests per second, the average response time increases exponentially to 20 to 30 seconds. I think that redis is blocked because of the significant increase in the number of queries.

Can rueidis significantly reduce response time when QPS exceed 30 or even 300 to 500? What is the correct way to use Rueidis to meet the goal of minimizing response times for FT.Search queries?

Can the redis JSON value be converted into struct+map form?

Value Example in Redis as fllows:

{
   "Uid":"xxx",
   "Phone":"139xxxxxxxxx",
   "hobit":{
        "swimming":"yes",
        "running":"yes",
        "football":"yes",
    }
}

Want to get redis json value from redis server like above,then converted into struct+map form like bellow:

type T struct {
	Uid   string `json:"Uid"`
	Phone string `json:"Phone"`
	Hobit map[string]interface{} `json:"hobit"`
}

Is there a function that can handle this?

request for manual pipe

in my case, i need sync a lots data to redis , from my db or csv.
so, it's need manual pipe.

i had test rueidis ( as default ) and go-redis v8 ( with pipe ) for 2K item import to redis, v8 is more quick.

ps: thanks for great work in rueidis.

Add ability to disable local caching

Currently, "cache size each conn" is set to the default value if 0 is provided: https://github.com/rueian/rueidis/blob/master/rueidis.go#L201-L203, https://github.com/rueian/rueidis/blob/8b21fc0a8614397bbe4beada7292f2b9431a9d18/pipe.go#L63. Perhaps we could skip the LRU entirely if zero has been passed? However, that would be a breaking change. Perhaps a separate flag could exist such as DisableLocalCache?

Ran into this while doing thanos-io/thanos#5593. I think some users might not want to cache items locally if their systems don't have much RAM, for example.

syntax error in prefix argument

currently calling B().ClientTracking().On().Prefix("a","b") produces client tracking on prefix a b which is wrong according to redis.

127.0.0.1:6379> client tracking on prefix mcs: dd: BCAST
(error) ERR syntax error
127.0.0.1:6379> client tracking on prefix mcs: prefix dd: BCAST
OK

stream read group when block 0

read group when block 0, will return context.DeadlineExceeded

xReadGroup = c.XReadGroup(context.Background(), XReadGroupArgs{
	Group:    "group",
	Consumer: "consumer",
	Streams:  []string{key, ">"},
	Block:    0,
})
fmt.Println(xReadGroup.Err())

Output:
context deadline exceeded

Is it safe to share a connection between goroutine to increment a common key?

Hi,

I wanted to ask one question,

is it safe to use share same connection between multiple goroutines to increment a common(same) key without the worry that different goroutines incrementing the same key will result in an incorrect value?

I am using keydb(which is multithread) in the place of redis with this library.

Thanks for the great work.

Proposal for a go-redis like high level api

Hi, I plan to use rueidis for some of my next projects, and I must say that I like your ''bottom-up" approach. But, currently writing commands is quite cumbersome. I would like to propose a design for a high-level API similar to what go-redis has.

Rueidis' "Getting Started" section uses this:

// SET key val NX
c.Do(ctx, c.B().Set().Key("key").Value("val").Nx().Build()).Error()
// GET key
c.Do(ctx, c.B().Get().Key("key").Build()).ToString()

What if we could turn that into:

// SET key val NX
c.SetNX(ctx, "key", "val").Error()
// GET key
c.Get(ctx, "key").ToString()

Well maybe it's not hard to do so, this was the most straightforward thing I could think of. We basically wrap cmds.Builder:

func (c *singleClient) SetNX(ctx context.Context, key, value string) (resp RedisResult) {
	cmd := c.B().Set().Key(key).Value(value).Nx().Build()
	resp = c.Do(ctx, cmd)
	cmds.Put(cmd.CommandSlice())
	return resp
}

func (c *singleClient) Get(ctx context.Context, key string) (resp RedisResult) {
	cmd := c.B().Get().Key(key).Build()
	resp = c.Do(ctx, cmd)
	cmds.Put(cmd.CommandSlice())
	return resp
}

The issue comes when trying to implement things like caching, how could the user specify to use client-side caching?

We could create a separate function for each command?
Each command would have two functions, * and *Cache (where * is the command).

func (c *singleClient) SetNXCache(ctx context.Context, key, value string) (resp RedisResult) 

func (c *singleClient) GetCache(ctx context.Context, key, value string) (resp RedisResult) 

Or perhaps by some kind of hacky optional boolean?

func (c *singleClient) SetNXCache(ctx context.Context, key, value string, cache ...bool) (resp RedisResult) 

func (c *singleClient) GetCache(ctx context.Context, key, value string, cache ...bool) (resp RedisResult) 

Or still something different?

Let me know what you think about this, If we can decide on a concise design I wouldn't mind implementing it.

JsonArrappend Fails to Append?

I'm trying to use rueidis to manipulate arrays in a json object I've stored in redis:

json.set foo . '{"hello": "people", "colours": []}'

The following works within the redis-cli:

127.0.0.1:6379> JSON.ARRAPPEND foo $.colours '"red"'
1) (integer) 1
127.0.0.1:6379> JSON.ARRAPPEND foo $.colours '"blue"'
1) (integer) 2
127.0.0.1:6379> json.get foo $.colours
"[[\"red\",\"blue\"]]"

However, when using the library it appears to fail:

err := con.Do(ctx, con.B().JsonArrappend().Key("foo").Path("$.colours").Value("red").Build()).Error()
log.Fatal(err)

2022/06/07 15:09:48 expected value at line 1 column 1

It appears to fail, because the resulting string isn't quoted properly. Reproducing this in the console as below, you can see what happens when a string has double qutoes, but not the outer single quotes.

127.0.0.1:6379> JSON.ARRAPPEND foo .colours "red"
(error) expected value at line 1 column 1

The same is true for other operations.

Redis Sentinel support

Hello @rueian. Just came across rueidis in the Go weekly mailing list. Many thanks for the library, great job. Decisions made on API look really nice.

One thing that is missing for me to painlessly try this lib in my projects is the lack of Redis Sentinel support. Do you have plans to support Sentinel inside rueidis? Or maybe as a separate library built on top of rueidis?

Error connecting to redis 5.0.7 (Ubuntu 20.04)

It returns ERR unknown command HELLO, with args beginning with: 3,

docker compose:

version: '3.7'

services:
    
  # REDIS: 3 MB
  redis1:
    image: redis:5.0.7
    command: redis-server --appendonly yes
    ports:
      - 6379:6379
    volumes:
      - ./tmp/redis:/data
    restart: unless-stopped

volumes:
  redisdata1:

code to reproduce:

package main
import "fmt"
import redis "github.com/rueian/rueidis"

func main() {
	conn, err := redis.NewClient(redis.ClientOption{
		InitAddress: []string{`127.0.0.1:6379`},
		Password:    ``,
		SelectDB:    4,
	})
	fmt.Println(conn,err)
}

go mod:

module d

go 1.19

require github.com/rueian/rueidis v0.0.73

Panic on `rdb.Do` when there's redis is not up.

Hi there
I've just noticed this behavior I guess it's not suppose to be the case as we return error?

	rdb, _ := rueidis.NewClient(rueidis.ClientOption{
		InitAddress: []string{"127.0.0.1:6379"},
	})
	defer rdb.Close()

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()

	err := rdb.Do(ctx, rdb.B().Ping().Build()).Error()
	if err != nil {
		log.Fatal("[redis] ping was not successfull", zap.Error(err))
	}
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x5cddee]

goroutine 1 [running]:
github.com/rueian/rueidis.(*clusterClient).Close(0x0)
        /github.com/rueian/[email protected]/cluster.go:580 +0x2e
panic({0x6df000, 0x9e1540})
        /usr/lib/golang/src/runtime/panic.go:838 +0x207
github.com/rueian/rueidis.(*clusterClient).B(0x7cf150?)
        /github.com/rueian/[email protected]/cluster.go:255
main.main()
        /main.go:61 +0x1ef
exit status 2

Take care

Context Canceled occurs when a JSONSet operation is performed

for Count < NeedCount && LoopTimes < LoopCount {
 //The for loop takes about 8-15 seconds,
}

err:=repo.db.Do(ctx, repo.db.B().JsonSet().Key(key).Path("$."+searchOption.Province+"."+searchOption.City).Value(strconv.Itoa(isGetting)).Build()).Error()

and when the for loop is complete, execute JsonSet occurs "Context Canceled "
How to avoid Context Canceled?

How to perform FT.AGGREGATE?

I have built the om.NewJSONRepository and with it, I can perform

repo.Search(ctx, func(schema om.FtSearchIndex) om.Completed {
        return schema.Query(...).Build()
})

But that's just the FT.SEARCH command. I need the FT.AGGREGATE functionality—is there a way to do that?

Panic on `RedisResult.AsStrMap()`

My project panics when I use RedisResult.AsStrMap()

Here are the outputs:

panic: runtime error: index out of range [21] with length 21

goroutine 1 [running]:
github.com/rueian/rueidis.(*RedisMessage).AsStrMap(0xc00037f890?)
        /home/user/go/pkg/mod/github.com/rueian/[email protected]/message.go:573 +0x206
github.com/rueian/rueidis.RedisResult.AsStrMap({{0x0, 0x0}, {0x2a, {0x0, 0x0}, {0xc0001f1600, 0x15, 0x15}, 0x0, 0x0}})
        /home/user/go/pkg/mod/github.com/rueian/[email protected]/message.go:246 +0x11f
main.main()
        /home/user/working/project/main.go:122 +0xe3b
exit status 2

This is the line:
https://github.com/rueian/rueidis/blob/f2c082a144ea6b762485b9320035ea2d7f169ab2/message.go#L573

It's probably due to fixed length of the map and some rounding:

r := make(map[string]string, len(m.values)/2)

Let me know if you need more info.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.