Giter Site home page Giter Site logo

dskit's Introduction

Grafana Logo (Light) Grafana Logo (Dark)

The open-source platform for monitoring and observability

License Drone Go Report Card

Grafana allows you to query, visualize, alert on and understand your metrics no matter where they are stored. Create, explore, and share dashboards with your team and foster a data-driven culture:

  • Visualizations: Fast and flexible client side graphs with a multitude of options. Panel plugins offer many different ways to visualize metrics and logs.
  • Dynamic Dashboards: Create dynamic & reusable dashboards with template variables that appear as dropdowns at the top of the dashboard.
  • Explore Metrics: Explore your data through ad-hoc queries and dynamic drilldown. Split view and compare different time ranges, queries and data sources side by side.
  • Explore Logs: Experience the magic of switching from metrics to logs with preserved label filters. Quickly search through all your logs or streaming them live.
  • Alerting: Visually define alert rules for your most important metrics. Grafana will continuously evaluate and send notifications to systems like Slack, PagerDuty, VictorOps, OpsGenie.
  • Mixed Data Sources: Mix different data sources in the same graph! You can specify a data source on a per-query basis. This works for even custom datasources.

Get started

Unsure if Grafana is for you? Watch Grafana in action on play.grafana.org!

Documentation

The Grafana documentation is available at grafana.com/docs.

Contributing

If you're interested in contributing to the Grafana project:

Get involved

This project is tested with BrowserStack

License

Grafana is distributed under AGPL-3.0-only. For Apache-2.0 exceptions, see LICENSING.md.

dskit's People

Contributors

56quarters avatar aknuds1 avatar bboreham avatar charleskorn avatar codesome avatar colega avatar csmarchbanks avatar cstyan avatar cyriltovena avatar dimitarvdimitrov avatar duricanikolic avatar francoposa avatar gotjosh avatar gouthamve avatar jml avatar joe-elliott avatar jtlisi avatar khaines avatar kminehart avatar krajorama avatar ortuman avatar pr00se avatar pracucci avatar pstibrany avatar replay avatar rfratto avatar simonswine avatar stevesg avatar tomwilkie avatar zendern avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dskit's Issues

Refactor Cache interfaces

I think we can simplify dskit in the following ways:

  • Add a Delete method to Cache - If an LRU doesn't have a delete method it can be a no-op. If people feel strongly about not adding a Delete method that doesn't work, we can split the interface into Cache and DeletableCache.
  • Remove cache.RemoteCacheClient - it's an abstraction that's only used in a handful of places.
  • Remove cache.MemcachedCache - it's a wrapper around remoteCache which doesn't do much.
  • Remove cache.RedisCache - it's a wrapper around remoteCache which doesn't do much.
  • Rename and refactor methods in cache.memcachedClient to implement cache.Cache.
  • Rename and refactor methods in cache.redisClient to implement cache.Cache.

ring/util.go WaitRingStability slow to respond

In Cortex the test TestStoreGateway_InitialSyncWithWaitRingStability/shuffle_sharding_strategy,_20_gateways,RF=3,SS=3(bucket_index_enabled=_false)
takes 30 seconds. And most of it is spent in WaitRingStability, which is weird as everything runs locally in the same process.

Goal: improve the responsiveness of the function if possible

Allow custom status page templates

Issue

Upstream projects may want to render pages in different ways, applying custom branding where necessary.

Solution

Allow using custom templates when instantiating the ring or memberlist.

Add support for GetPrefix operation in dskit/kv

Both consul and etcd support the possibility to atomically fetch a given set objects starting with a key prefix. Would be desirable to support this in dskit/kv as it might be useful for certain use cases.

Proposal: make memberlist TCP Transport non blocking

This proposal is based on a theory I have, but haven't proved if it can significantly improve the messages propagation yet.

The dskit's memberlist client propagate changes (e.g. hash ring changes) using broadcast messages:

func (m *KV) queueBroadcast(key string, content []string, version uint, message []byte) {
l := len(message)
b := ringBroadcast{
key: key,
content: content,
version: version,
msg: message,
finished: func(b ringBroadcast) {
m.totalSizeOfBroadcastMessagesInQueue.Sub(float64(l))
},
logger: m.logger,
}
m.totalSizeOfBroadcastMessagesInQueue.Add(float64(l))
m.broadcasts.QueueBroadcast(b)
}

Messages are push to a queue, along with messages received from other nodes and enqueue for re-broadcasting, and then a single goroutine periodically pulls messages from the queue and send them to some other (random) nodes:
https://github.com/grafana/memberlist/blob/09ffed8adbbed3ee3ace7de977c5e1cade90da87/state.go#L604-L626

The gossip() function, responsible to pick up messages from the queue and send them to some random nodes, is called at a regular interval, configued by -memberlist.gossip-interval (200ms by default):
https://github.com/grafana/memberlist/blob/09ffed8adbbed3ee3ace7de977c5e1cade90da87/state.go#L131-L135

For each gossip interval, the gossip() function picks up messages up to UDPBufferSize, which is hardcoded to 10MB in dskit (because dskit uses a custom transport based on TCP):

// Memberlist uses UDPBufferSize to figure out how many messages it can put into single "packet".
// As we don't use UDP for sending packets, we can use higher value here.
mlCfg.UDPBufferSize = 10 * 1024 * 1024

We assume that we broadcast new messages and re-broadcast received message every -memberlist.gossip-interval=200ms but that's not guaranteed: the gossip() function make take significantly longer, because the dskit's custom TCP transport blocks on each call to rawSendMsgPacket(). In particular, the call to rawSendMsgPacket() blocks for the whole duration it takes to the TCP transport to create a new TCP connection, write the data and close it:

func (t *TCPTransport) writeTo(b []byte, addr string) error {
// Open connection, write packet header and data, data hash, close. Simple.

For example, if it can't reach a node and the connection times out the gossiping is blocked for 5 seconds (when running with the default config -memberlist.packet-dial-timeout=5s).

Proposal

The gossip() in memberlist library assumes an UDP transport, where sending packet is a "fire and forget", and basically doesn't block for a long time. On the other side, the dskit's TCP transport blocks even if it swallows any error:

// WriteTo is used to send "UDP" packets. Since we use TCP, we can detect more errors,
// but memberlist library doesn't seem to cope with that very well. That is why we return nil instead.
return time.Now(), nil

In pratice, it's currently the worst of the two worlds: with TCP we can detect more delivery errors at the cost of blocking the call, but we never return the error.

I propose to make TCP transport's WriteTo() non-blocking. It preserves the current "fire and forget" behaviour without blocking the gossiping of messages to other nodes if a node is unresponsive.

Working examples for `ring.Ring`

Hello Team โœ‹

I'm hoping to use the ring packages to create a consistent ring for my project

and it would be great if the DS-Kit had working examples for this package.

Ring clients affect CAS performances on Consul

I've spent the last working day benchmarking "ring on Consul" performances. To do it, I've built a tiny tool to run X lifecycles and Y clients (code) and run it from multiple machines/pods against a dedicated Consul server (I tested both Consul 1.5.3 and 1.10.3 getting comparable results).

I've done several tests, many of which inconclusive, so here I'm reporting the only one that looked to give consistent results across many different tests: ring clients (watching the ring key) affect CAS performances on Consul.

Scenario

  • 100 lifecyclers heartbeating the ring
  • 512 tokens per lifecycler
  • Lifecyclers heartbeat period = 10s
  • A variable number of clients watching the ring
  • 60s run for each test (but I've run them many times getting comparable results over time)
  • Lifecyclers and clients always running on different pods/machines
  • Dedicated Consul server (none else using it)

See the code to check how timing was tracked.

0 clients

level=info msg=operations CAS()=505 datasize(bytes)=259370
level=info msg=consul.Get() avg=1.846022ms min=1.323458ms max=8.913187ms
level=info msg=consul.CAS() avg=9.077085ms min=6.96226ms max=25.035527ms
level=info msg=client.CAS() avg=24.952575ms min=20.455846ms max=45.214343ms retries=5 conflicts=5

1 client

level=info msg=operations CAS()=505 datasize(bytes)=259379
level=info msg=consul.Get() avg=1.844928ms min=1.323242ms max=4.322768ms
level=info msg=consul.CAS() avg=9.348299ms min=7.261846ms max=18.869644ms
level=info msg=client.CAS() avg=25.142215ms min=20.195086ms max=38.79353ms retries=5 conflicts=5

400 clients

level=info msg=operations CAS()=674 datasize(bytes)=259394
level=info msg=consul.Get() avg=11.81947ms min=1.180597ms max=642.563055ms
level=info msg=consul.CAS() avg=29.351135ms min=6.880116ms max=1.690589329s
level=info msg=client.CAS() avg=64.645022ms min=20.237888ms max=1.710288901s retries=174 conflicts=180

Summary:

  • With 400 clients we have client.CAS() avg going from 25ms to 65, but variance goes from 25ms to 1.7s.
  • client.CAS() average timing is close the highest CAS QPS we can get in the best case scenario (eg. if avg is 65ms we can't successfully CAS more than 15 times / sec).
  • The longer client.CAS() takes the more conflicts we have, but that's obvious

Experiment: introduce a slow down in clients

Hypothesis: clients are watching a key and then they get back an update (all at the same time). Then they all send another "get" request to Consul nearly at at the same time. We're DoS-ing Consul in short bursts happening every second (because of the rate limit we configure in our Consul client wrapper).

Experiment: I've tried to introduce a random delay after the "get" request returns in the WatchKey() to slow down subsequent requests to Consul (see commented code).

Result: could help a bit introducing few seconds delay, but not with lower values.

400 clients / with time.Sleep(dstime.DurationWithJitter(500*time.Millisecond, 1))

level=info msg=operations CAS()=884 datasize(bytes)=251748
level=info msg=consul.Get() avg=22.734101ms min=1.230174ms max=859.036878ms
level=info msg=consul.CAS() avg=69.33508ms min=7.465628ms max=1.54668955s
level=info msg=client.CAS() avg=123.077636ms min=20.635579ms max=2.303539771s retries=398 conflicts=419

400 clients / with time.Sleep(dstime.DurationWithJitter(2*time.Second, 1))

level=info msg=operations CAS()=521 datasize(bytes)=259338
level=info msg=consul.Get() avg=2.104879ms min=1.19876ms max=23.720245ms
level=info msg=consul.CAS() avg=11.232828ms min=7.334579ms max=322.944766ms
level=info msg=client.CAS() avg=29.64897ms min=20.379623ms max=402.913091ms retries=21 conflicts=21

Experiment: how ring size (bytes) affects performances

I've run the same benchmark but with 1 token per instance instead of 512. This reduce the ring size from about 250KB to less than 1KB. Performances are better, but watching clients affect CAS performances anyway.

Idea: introduce a proxy

Another idea I've got (but didn't try because of lack of time in my timeboxed test) is to introduce a caching proxy in front of Consul API. We would have the proxy speaking the Consul API, doing a pass-through of all requests to Consul except for "get" requests.

Get requests are de-multiplexed by the proxy: the proxy just watch once each key to Consul, keeps the latest value in memory and then serve the clients based on the (stale) in-memory copy of it (honoring the index version specified in the request).

Alternatives

In any case, heartbeating the ring on Consul can't scale much because the max QPS of successful CAS operations is given by the time it takes client.CAS() (which includes get the updated key from Consul + decode + update ring data structure + encode + call CAS on Consul). That's the main reason why we've built memberlist support.

Make `flagext` pkg consistent across different projects

Currently Loki uses, its own version of flagext package

Should we also bring this to dskit?

One of the rationale from my side personally is, it will make entitlement system validation simpler. Having it in single place. e.g: I can implement custom JSON unmarshalling for these types in one place.

Race condition in BasicLifecycler

There's a small race condition in BasicLifecycler where concurrent calls to updateInstance could cause a lost update if the ordering of calls changes after l.store.CAS and before updating l.currInstanceDesc. Some approaches for guarding against this are:

  • Pessimistic - We can broaden the currState.Lock() to include the CAS call.
  • Optimistic - We can store a currVersion along with currInstanceDesc, update CAS to return the new version, and only update currInstanceDesc if the new version is > currVersion.

We should also consider whether other methods that update l.currInstanceDesc should be similarly guarded.

Change Consul ACLToken to flagext.Secret

Some downstream projects display the config and use flagext.Secret to hide sensitive data. Consul's ACLToken is also sensitive, so we should change it to flagext.Secret type.

KVConfig. MetricsRegisterer looks an anti-pattern

The KVConfig defined in kv/memberlist/memberlist_client.go includes MetricsRegisterer prometheus.Registerer:

// Where to put custom metrics. Metrics are not registered, if this is nil.
MetricsRegisterer prometheus.Registerer `yaml:"-"`
MetricsNamespace string `yaml:"-"`

This looks an anti-pattern to me. The registerer is typically specified in the NewXXX() function. In this case, I'm wondering if we could remove the registered from there and take it in input in the NewKV() function.

Any reason why not doing it?

Crash immediately when critical services cannot start.

We should be able to mark services as critical. If such as service fails to load the complete startup process should fail immediately rather than reporting a dependency failing. Ideally the exit indicates which service failed.

Make GetInstanceAddr work on IPv6

GetInstanceAddr() calls getFirstAddressOf() which calls filterIPs() which only returns v4 addresses.

I don't have any pressing need for this improvement; just seemed like something we will have to do some day.

[multikv] application is shut down when secondary store(memberlist) failed to start

In our case, we use primary store (consul) and secondary store (memberlist). So, when the app established a connection with consul, the service becomes Ready, and after some time if memberlist module failed to start, the whole application is shut down with messages:

level=error ts=2022-07-05T09:25:39.8466124Z caller=memberlist_client.go:466 msg="failed to join memberlist cluster" err="1 error occurred:\n\t โ”‚
โ”‚ level=info ts=2022-07-05T09:25:39.8468695Z caller=memberlist_client.go:591 msg="leaving memberlist cluster"                                    โ”‚
โ”‚ level=warn ts=2022-07-05T09:25:39.847216Z caller=module_service.go:112 msg="module failed with error" module=memberlist-kv err="service &{0xc0 โ”‚
โ”‚ level=error ts=2022-07-05T09:25:39.847582Z caller=loki.go:382 msg="module failed" module=memberlist-kv error="service &{0xc00097e8c0 { true 10 โ”‚
โ”‚ level=info ts=2022-07-05T09:25:39.8477152Z caller=basic_lifecycler.go:202 msg="ring lifecycler is shutting down" ring=admin-api                โ”‚
โ”‚ level=info ts=2022-07-05T09:25:39.8484468Z caller=compactor.go:369 msg="compactor exiting"                                                     โ”‚
โ”‚ level=info ts=2022-07-05T09:25:39.8485662Z caller=basic_lifecycler.go:202 msg="ring lifecycler is shutting down" ring=compactor                โ”‚
โ”‚ level=info ts=2022-07-05T09:25:39.8477701Z caller=manager.go:238 msg="stopping user managers"                                                  โ”‚
โ”‚ level=info ts=2022-07-05T09:25:39.8490085Z caller=manager.go:252 msg="all user managers stopped"

Actual Result: service is stopped if the secondary store(memberlist) does not work.
Expected Result: service continues working if the primary store works.

etcd mock client depends on etcd server packages

kv/etcd/mock.go depends on etcd server packages. The server package in turn pulls in go.opentelemetry.io/otel packages. This complicates using dskit in Tempo that pulls in conflicting versions of these otel packages.

Example from trying to use newer versions of dskit in Tempo:

$ go mod vendor
go: downloading github.com/google/flatbuffers v2.0.0+incompatible
go: downloading github.com/go-test/deep v1.0.7
go: finding module for package go.opentelemetry.io/otel/semconv
github.com/grafana/tempo/pkg/ring imports
        github.com/grafana/dskit/kv imports
        github.com/grafana/dskit/kv/etcd imports
        go.etcd.io/etcd/server/v3/embed imports
        go.opentelemetry.io/otel/semconv: package go.opentelemetry.io/otel/semconv provided by go.opentelemetry.io/otel at latest version v0.20.0 but not at required version v1.0.0-RC2

A library attempting to be lightweight like dskit shouldn't be pulling in otel packages. We should refactor the mock etcd client (which we use for unit tests) in order to avoid pulling in the etcd server.

Race in `moduleService` wrapper

moduleService wrapper is responsible for waiting until all dependencies start, and then start the wrapped service. It does so by calling

err := w.service.StartAsync(context.Background())

and then waiting for service to enter Running state:

return w.service.AwaitRunning(serviceContext)

However if the wrapped service is so fast that it ends before AwaitRunning call, then AwaitRunning returns error invalid service state: Terminated, expected: Running.

We can solve this by starting AwaitRunning before StartAsync (in separate goroutine), or adding a listener to the service instead and wait for any transition away from Starting state.

Rename `ingester` references inside dskit/ring package

Not a biiig concern, but I noticed that dskit/ring mentions ingester everywhere. I think changing these mentions to something more generic is beneficial because at first glance isn't clear if that code was applied for all rings or just for the ingester.

etcd client set log

etcd client can't config log, which causes log output by the etcd client to be on the stderr,This may cause sigpipe problems when cooperating with systemd. Whether the function of log can be configured?

Flaky `TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy`

The test failed in this run, but succeeded in the retry. Revision was f73a2db

--- FAIL: TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy (88.81s)
    --- FAIL: TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy/num_instances_=_9,_num_zones_=_3,_update_oldest_registered_timestamp_=_true (0.02s)
        ring_test.go:1840: random generator seed: 1663955757664614088
        ring_test.go:1958: subring generated after event 4 is expected to include instance instance-6 from ring state at time 2022-09-23 19:11:57.669186493 +0000 UTC m=+4673.344713179 but it's missing (actual instances are: instance-9, instance-10, instance-2)

Inefficient TCP connections use by memberlist transport

We use a custom transport for memberlist, based on TCP protocol. The main reason why we use TCP is being able to transfer messages which are bigger than the maximum payload of an UDP packet (typically, slightly less than 64KB).

Currently, the TCP transport is implemented in an inefficient way with regards to TCP connection establishment. For every single packet a node needs to transfer to another node, the implementations creates a new TCP connection, writes the packet and then close the connection. See:

func (t *TCPTransport) writeTo(b []byte, addr string) error {

We should consider alternatives like:

  • Pros/cons of keeping long-lived TCP connections between nodes, and multiplexing multiple packets over the same connection
  • Using a mix of UDP and TCP, selecting the protocol based on the message size (in this case, TLS support wouldn't be available)

Ring lifecycler stuck when increasing tokens and instance is already in the ring

I've found a bug in the ring.Lifecycler which happens in the following scenario:

  • Ingester is already registered in the ring with 64 tokens and LEAVING state
  • Ingester is configured to store/load tokens from disk
  • Ingester starts up with a tokens config change (from 64 to 128)

What we get is that ingester will never increase tokens to 128 and will never switch to ACTIVE in the ring.

The issue should be easy to reproduce with a unit test.

ring: Members of the ring stay `unhealthy` after leaving the ring.

Problem

Recently we started seeing lots of "unhealthy" instances on the distributor ring in Loki clusters during normal rollout. These "unhealthy" members are the ones who already left the ring but they failed to unregister themselves from the underlying KV store (consul in this case).

Did some investigation and would like record the behavior here.

Investigation

So what happens when a member of the ring leaves?

It needs to unregister() itself from the underlying KV store. And this unregister basically means removing its instance-id from the shared key value called collectors/distributor. (or any other key based on the components)

This key value collector/distributor is been shared among all the members of the ring. And every member needs to update it before successfully leaving the ring.

This shared access to the collectors/distributor is provided by something called CAS access. It provides some sort of synchronization mechanism for the key value shared by multiple members of the ring.

The way CAS (Check-And-Set) works is based on current ModifyIndex, a metadata about the key value you are tyring to set(or update).

This index is monotonically increasing counter that get updated every time any member update this particular kv.

This CAS operation is similar to PUT operation except, it takes additional param called cas, an integer representing ModifyIndex of the key value you are trying to update.

Two cases.

  1. if cas is set to 0 (or leave it empty) then the kv is updated only if it doesn't exist before.
  2. but if cas is set to non-zero value, then the kv is updated ony if that value of the cas matches with current ModifyIndex of the kv.

e.g; (I ran it in our clusters)

curl -s http://127.0.0.1:8500/v1/kv/collectors/distributor | jq .[0].ModifyIndex
8149486

Here modifyindex is 8149486. If I try to cas with lesser value, then it returns false.

curl -s --request PUT --data 10 http://127.0.0.1:8500/v1/kv/collectors/distributor?cas=100 && echo
false

This is what happening in our distributor ring. Some "LEAVE"ing distributor, try max of 10 times to set the value with ModifyIndex that they got previously. But meanwhile some other distributor would have updated it, making that ModifyIndex out of date. Thus making these distributors ran out of retries.

Proposal

The ideal solution is bit tricky. As we need some kind of synchronisation with same key:value collectors/distributors shared by all the members of the ring. which is what ModifyIndex gives us anyway.

So tweaking the MaxCasRetry (and/or CasRetryDelay) configs would be good one IMHO. Unfortunately, dskit currently doesn't accept to tweak this retry arguments. But I think we can make it configurable. And setting retry to same as number of replicas always resulted in no unhealthy members (tested it with even 100 distributor replicas)

So I think it's better

  1. Make the flags configurable
  2. Document this behavior in the ring and kv packages.

NOTES

  1. In case of cas returning false (because of unmatched ModifyIndex), it will be treated as non-error case.
    e.g this handled in the dskit code here

  2. The default retry of 10 times may work for ingester (it has expensive and time-consuming gracefull shutdown process). So most likely, one of the CAS retry would succeed. But not the case for distributor, it can shutdown almost instantly and thus high chance lots of distributors leaving the ring, accessing the shared key value at the same time.

  3. While investigating, I also found the error returned by unregister operation will be lost if ring is wrapped in BasicService and the service doesn't have any listeners.

  4. CAS doc from consul website.

Any thoughts, suggestions or criticisms are welcome!

Add sentinel errors for the limiter package

The limiter#Limiter returns two kinds of errors from WaitN():

return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)
return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)

In the case of an HTTP request limit, I would ideally return a 429 Too Many Requests for each, but there are other errors that can be returned, like a context cancellation. It would be convenient to have an error type exposed in the package API that can be used with errors.As/errors.Is to see if the limiter has rejected the request.

Flaky `TestMultipleClientsWithSameLabelWithClusterLabelVerification`

The test failed the first run, but succeeded the second. Revision is 126e8a8

--- FAIL: TestMultipleClientsWithSameLabelWithClusterLabelVerification (24.37s)
    memberlist_client_test.go:672: Waiting before start
    memberlist_client_test.go:680: Observing ring ...
    memberlist_client_test.go:694: Update 68.522298ms : Ring has 2 members, and 256 tokens, oldest timestamp: 825.359625ms avg timestamp: 825.359625ms youngest timestamp: 825.359625ms
    memberlist_client_test.go:694: Update 153.817701ms : Ring has 3 members, and 384 tokens, oldest timestamp: 910.655027ms avg timestamp: 910.655027ms youngest timestamp: 910.655027ms
    memberlist_client_test.go:694: Update 459.555858ms : Ring has 4 members, and 512 tokens, oldest timestamp: 1.216393192s avg timestamp: 1.216393192s youngest timestamp: 1.216393192s
    memberlist_client_test.go:694: Update 460.579224ms : Ring has 5 members, and 640 tokens, oldest timestamp: 1.217416552s avg timestamp: 1.217416552s youngest timestamp: 1.217416552s
    memberlist_client_test.go:694: Update 461.713676ms : Ring has 6 members, and 768 tokens, oldest timestamp: 1.218551007s avg timestamp: 1.218551007s youngest timestamp: 1.218551007s
    memberlist_client_test.go:694: Update 463.242127ms : Ring has 7 members, and 896 tokens, oldest timestamp: 1.220079455s avg timestamp: 1.220079455s youngest timestamp: 1.220079455s
    memberlist_client_test.go:694: Update 503.895559ms : Ring has 10 members, and 1280 tokens, oldest timestamp: 2.260732883s avg timestamp: 2.260732883s youngest timestamp: 1.260732883s
    memberlist_client_test.go:694: Update 558.913038ms : Ring has 10 members, and 1280 tokens, oldest timestamp: 2.315750343s avg timestamp: 2.315750343s youngest timestamp: 1.315750343s
    memberlist_client_test.go:694: Update 1.002496549s : Ring has 10 members, and 1280 tokens, oldest timestamp: 2.759333874s avg timestamp: 1.759333874s youngest timestamp: 759.333874ms
    memberlist_client_test.go:694: Update 1.084007026s : Ring has 10 members, and 1280 tokens, oldest timestamp: 2.840844352s avg timestamp: 1.840844352s youngest timestamp: 840.844352ms
    memberlist_client_test.go:694: Update 1.13045055s : Ring has 10 members, and 1280 tokens, oldest timestamp: 2.887287876s avg timestamp: 1.887287876s youngest timestamp: 887.287876ms
    memberlist_client_test.go:694: Update 1.136422462s : Ring has 10 members, and 1280 tokens, oldest timestamp: 2.89325979s avg timestamp: 1.89325979s youngest timestamp: 893.25979ms
    memberlist_client_test.go:694: Update 1.258210064s : Ring has 10 members, and 1280 tokens, oldest timestamp: 3.015047392s avg timestamp: 2.015047392s youngest timestamp: 1.015047392s
    memberlist_client_test.go:694: Update 1.278908586s : Ring has 10 members, and 1280 tokens, oldest timestamp: 3.035745902s avg timestamp: 2.035745902s youngest timestamp: 1.035745902s
    memberlist_client_test.go:702: Ring updates observed: 14
    memberlist_client_test.go:723: KV 0: number of known members: 10
    memberlist_client_test.go:620: 
        	Error Trace:	memberlist_client_test.go:620
        	Error:      	Received unexpected error:
        	            	Member 0: invalid state of member Member-9 in the ring: 2 
        	Test:       	TestMultipleClientsWithSameLabelWithClusterLabelVerification

Ring members return as unhealthy and stuck the cluster

Problem

On Loki (and apparently in other projects) we're facing this weird scenario where ring members return as unhealthy and once this happens to more than one member, the ring gets stuck (maybe because we're running with replication_factor = 3? ๐Ÿค” )

What we know so far is:

  • Clicking manually to forget the unhealthy members does fix the issue
  • I don't think we ever faced this with typical healthy rollouts; I think all occasions it did happen were when pods were deleted or OOMKilled or the k8s node hosting the pods had an outage, so maybe it has a relationship with our heartbeat logic ๐Ÿค”
  • For Loki, this happens for both, ingesters members and distributors members, so it isn't related to a specific implementation of the project
  • The typical log line we use to identify this behavior is the one below (for Loki):
level=warn ts=2022-07-28T17:32:28.308736477Z caller=grpc_logging.go:43 method=/httpgrpc.HTTP/Handle duration=179.426ยตs err="rpc error: code = Code(500) desc = at least 2 live replicas required, could only find 1 - unhealthy instances: x.x.x.x:9095,y.y.y.y:9095\n" msg=gRPC

This is probably not enough detail to track down what is wrong, so if this happens again I'll make sure to grab a memory dump and other things that might help.

Migrate metric test helpers from Mimir

Mimir has a couple of test helper methods for testing emitted metrics that would be good to migrate to dskit:

I was wondering if we could reuse the utilities from the metrics package, but I don't think we can out of the box. Not high priority, but I think in Mimir we have most of the missing utilities ready and we could move them to dskit:

  • findHistogramMatchingLabels() from pkg/frontend/querymiddleware/codec_json_test.go
  • getMetricsMatchingLabels() from pkg/storegateway/bucket_e2e_test.go
  • MatchesSelectors() from pkg/util

From https://github.com/grafana/dskit/pull/344/files#r1285937024

"unsupported value type" in netutil log

I've seen this log:

level=debug ts=2022-05-17T02:01:28.39900695Z caller=netutil.go:55 msg="found network interfaces with private IP addresses assigned" interfaces="unsupported value type"

The interfaces field value can't be stringified and so it just displays a useless "unsupported value type". We should fix it.

flagext needs tests

For each kind of flag we should check that valid strings are correctly parsed, and some invalid strings are rejected.

consider consul with TLS

Hi,

I'm currently trying to setup loki with consul.

As you use the consul.NewClient but pass in a custom httpclient

Consul ignores its environment variables (CONSUL_CACERT, CONSUL_CLIENT_CERT, CONSUL_CLIENT_KEY) that point to the TLS parts https://github.com/hashicorp/consul/blob/main/api/api.go#L706

It looks like the only reason that you pass in a custom client is to use the clean http transport and the timeout.

the transport could be passed in seperatly https://github.com/hashicorp/consul/blob/main/api/api.go#L678

for the timeout you could do this afterwards

So instead of

client, err := consul.NewClient(&consul.Config{
		Address: cfg.Host,
		Token:   cfg.ACLToken.String(),
		Scheme:  "http",
		HttpClient: &http.Client{
			Transport: cleanhttp.DefaultPooledTransport(),
			// See https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts/
			Timeout: cfg.HTTPClientTimeout,
		},
	})

Something like this:

config := &consul.Config{
		Address: cfg.Host,
		Token:   cfg.ACLToken.String(),
		Scheme:  "http", 
                Transport: cleanhttp.DefaultPooledTransport(),
	}
client, err := consul.NewClient(config)

// not nice, but should work I guess :)
config.HttpClient.Timeout =  cfg.HTTPClientTimeout

Ring metrics updated slowly

A sort of regression as PR #50 removed collecting metrics directly. Now readers of metrics, for example e2e tests have to wait for a possible 10s (hard-coded) to have updated metrics. This means a big slowdown in tests and possible loss of information.

Expose Memberlist secret configuration

Expose Memberlist secret configuration, so a cluster unique secret key can be used to prevent different clusters from joining each other. The secret key is typically derived by hashing a cluster name.

When lifecycler reuses existing LEAVING entry, it waits for another heartbeat period to write ACTIVE state to ring

When lifecycler finds existing entry in the ring, it will reuse state and tokens from it. If the state was LEAVING and entry has enough tokens, lifecycler immediately switches its internal state to ACTIVE, but it doesn't update the ring with this state, and instead waits for next heartbeat.

With long heartbeats (eg. 2 minutes), this makes the time until ACTIVE state of lifecycler is visible to other components longer.

This also affects how soon lifecycler reports "ready" in its CheckReady function, because it actually checks the state in the ring. (This is correct behaviour. If it only consulted its internal state, we could report "ready" before other components saw it as active, which could result in an outage in case of Mimir ingesters for example).

DayValue gives incorrect results when run in non-UTC environment

When running unit tests locally for DayValue, they fail and appear to be off by one day.

--- FAIL: TestDayValueYAML (0.00s)
    day_test.go:25: 
                Error Trace:    day_test.go:25
                Error:          Not equal: 
                                expected: []byte{0x64, 0x61, 0x79, 0x3a, 0x20, 0x22, 0x31, 0x39, 0x38, 0x35, 0x2d, 0x30, 0x36, 0x2d, 0x30, 0x32, 0x22, 0xa}
                                actual  : []byte{0x64, 0x61, 0x79, 0x3a, 0x20, 0x22, 0x31, 0x39, 0x38, 0x35, 0x2d, 0x30, 0x36, 0x2d, 0x30, 0x31, 0x22, 0xa}
                            
                                Diff:
                                --- Expected
                                +++ Actual
                                @@ -1,3 +1,3 @@
                                 ([]uint8) (len=18) {
                                - 00000000  64 61 79 3a 20 22 31 39  38 35 2d 30 36 2d 30 32  |day: "1985-06-02|
                                + 00000000  64 61 79 3a 20 22 31 39  38 35 2d 30 36 2d 30 31  |day: "1985-06-01|
                                  00000010  22 0a                                             |".|
                Test:           TestDayValueYAML
    day_test.go:47: 
                Error Trace:    day_test.go:47
                Error:          Not equal: 
                                expected: []byte{0x64, 0x61, 0x79, 0x3a, 0x20, 0x22, 0x31, 0x39, 0x38, 0x35, 0x2d, 0x30, 0x36, 0x2d, 0x30, 0x32, 0x22, 0xa}
                                actual  : []byte{0x64, 0x61, 0x79, 0x3a, 0x20, 0x22, 0x31, 0x39, 0x38, 0x35, 0x2d, 0x30, 0x36, 0x2d, 0x30, 0x31, 0x22, 0xa}
                            
                                Diff:
                                --- Expected
                                +++ Actual
                                @@ -1,3 +1,3 @@
                                 ([]uint8) (len=18) {
                                - 00000000  64 61 79 3a 20 22 31 39  38 35 2d 30 36 2d 30 32  |day: "1985-06-02|
                                + 00000000  64 61 79 3a 20 22 31 39  38 35 2d 30 36 2d 30 31  |day: "1985-06-01|
                                  00000010  22 0a                                             |".|
                Test:           TestDayValueYAML

Local machine:

$ timedatectl 
               Local time: Wed 2021-08-25 19:21:47 EDT
           Universal time: Wed 2021-08-25 23:21:47 UTC
                 RTC time: Wed 2021-08-25 23:21:47
                Time zone: America/New_York (EDT, -0400)
System clock synchronized: yes
              NTP service: active
          RTC in local TZ: no

Ring client pool should not execute health checks sequentially

The ring client pool executes health checks to its target addresses sequentially, see

dskit/ring/client/pool.go

Lines 178 to 188 in 0bff37e

for _, addr := range p.RegisteredAddresses() {
client, ok := p.fromCache(addr)
// not ok means someone removed a client between the start of this loop and now
if ok {
err := healthCheck(client, p.cfg.HealthCheckTimeout)
if err != nil {
level.Warn(p.logger).Log("msg", fmt.Sprintf("removing %s failing healthcheck", p.clientName), "addr", addr, "reason", err)
p.RemoveClientFor(addr)
}
}
}

This can be a serious problem, when there are hundreds of target addresses to check, and you still want to have a short health check interval and a reasonably long timeout. E.g. 10s interval, 1s timeout. In case of e.g. 500 target addresses, a single request may take 20ms so that all 500 requests can finish within the 10s interval. Under normal load that should not be a problem. However, such a configuration would only give time for 10 requests to time out until the next interval is missed (it would be skipped). That could delay the next health check significantly, which could lead to clients trying to connect to unhealthy targets.

I propose to parallelize the health check execution.

Connection pool closes active connection if health-check is failed

Problem:
The connections pool runs a health check for each connection in the background and if the health check is completed with an error, it closes this connection. (the logic implemented here)

However, it leads to race conditions, because this connection might be used by another goroutine that will also try to close the connection after reading the response from the server, and if the connections pool already closed this connection, this another goroutine receives an error: rpc error: code = Canceled desc = grpc: the client connection is closing.

This issue was found during the investigation grafana/loki#4991

Possible solutions:

  1. we could add retry to health check logic to not close the connection after the first failed attempt and retry a few times more to make sure that the connection is really unhealthy.
  2. we could implement logic to check if this connection is used by another goroutine and if so, wait until that goroutine releases the connection

Creating multiple KV client within the same process panic

Because the metrics registration is done inside each client and there's no way to pass your own metric struct.

Even though it's not known in advance what would be the metric to create since the type of client is decided later during NewClient call.

This seems like a big refactoring, for now I'm discarding all metrics for my client, but I'd like to gather idea on how to solve this.

Example of panic:

	/Users/ctovena/go/src/github.com/grafana/dskit/vendor/github.com/prometheus/client_golang/prometheus/promauto/auto.go:362 +0xf5
github.com/grafana/dskit/kv.newMetricsClient({0x1999d9c, 0x8}, {0x1ac3030, 0xc0002b61b0}, {0x1ab1d20, 0xc0002fc000})
	/Users/ctovena/go/src/github.com/grafana/dskit/kv/metrics.go:51 +0x1d5
github.com/grafana/dskit/kv.createClient({_, _}, {_, _}, {{{0x0, 0x0}, {0x0, 0x0}, 0x0, 0x0, ...}, ...}, ...)
	/Users/ctovena/go/src/github.com/grafana/dskit/kv/client.go:188 +0x6f9
github.com/grafana/dskit/kv.NewClient({{0x1999d9c, 0x8}, {0x0, 0x0}, {{{0x0, 0x0}, {0x0, 0x0}, 0x0, 0x0, ...}, ...}, ...}, ...)
	/Users/ctovena/go/src/github.com/grafana/dskit/kv/client.go:131 +0x13c

But obviously this would happen for all metrics.

Empty ring right after startup when using memberlist

In Mimir, we're occasionally seeing "empty ring" ring right after a process startup (e.g. querier). It's an issue that has started after the migration to memberlist.

Possible root cause

I think the issue is caused by the ring client implementation not guaranteeing to wait to get the initial ring state before switching to Running state. In the following I share some thoughts about the code.

The ring client service is expected to switch to Running state only after it initialized its internal state with the ring data structure. This is why it calls r.KVClient.Get() in the Ring.starting():

dskit/ring/ring.go

Lines 252 to 256 in e441b77

func (r *Ring) starting(ctx context.Context) error {
// Get the initial ring state so that, as soon as the service will be running, the in-memory
// ring would be already populated and there's no race condition between when the service is
// running and the WatchKey() callback is called for the first time.
value, err := r.KVClient.Get(ctx, r.key)

When using Consul or etcd as backend, the r.KVClient.Get() guarantees to return the state of the ring, but I think this guarantee has been lost in the memberlist implementation and it could return a zero data structure.

The memberlist client Get() is implemented here:

func (c *Client) Get(ctx context.Context, key string) (interface{}, error) {
err := c.awaitKVRunningOrStopping(ctx)
if err != nil {
return nil, err
}
return c.kv.Get(key, c.codec)
}

It waits until the backend KV client is running. But does waiting for it to be running guarantee the ring data structure to be populated before that? I don't think so.

The memberlist KV.starting() just initialise memberlist but doesn't join the cluster:

func (m *KV) starting(_ context.Context) error {
mlCfg, err := m.buildMemberlistConfig()
if err != nil {
return err
}
// Wait for memberlist and broadcasts fields creation because
// memberlist may start calling delegate methods if it
// receives traffic.
// See https://godoc.org/github.com/hashicorp/memberlist#Delegate
//
// Note: We cannot check for Starting state, as we want to use delegate during cluster joining process
// that happens in Starting state.
m.initWG.Add(1)
list, err := memberlist.Create(mlCfg)
if err != nil {
return fmt.Errorf("failed to create memberlist: %v", err)
}
// Finish delegate initialization.
m.memberlist = list
m.broadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: list.NumMembers,
RetransmitMult: mlCfg.RetransmitMult,
}
m.initWG.Done()
return nil
}

The memberlist cluster is joined only in the KV.running(), but that's too late, because at that point our code assume the ring data structure to be already populated:

func (m *KV) running(ctx context.Context) error {
// Join the cluster, if configured. We want this to happen in Running state, because started memberlist
// is good enough for usage from Client (which checks for Running state), even before it connects to the cluster.
if len(m.cfg.JoinMembers) > 0 {
// Lookup SRV records for given addresses to discover members.
members := m.discoverMembers(ctx, m.cfg.JoinMembers)
err := m.joinMembersOnStartup(ctx, members)
if err != nil {
level.Error(m.logger).Log("msg", "failed to join memberlist cluster", "err", err)
if m.cfg.AbortIfJoinFails {
return errFailedToJoinCluster
}
}
}

Potential memberlist metrics issues.

Hey all,

We are using Tempo, and trying to make use of the Tempo mixin Grafana dashboards... https://github.com/grafana/tempo/tree/main/operations/tempo-mixin

However I've noticed that some of the metrics that the dashboard expects that come from dskit/kv/memberlist/metrics might not be coming out correctly.

  1. tempo_memberlist_client_kv_store_count
    This chart https://github.com/grafana/tempo/blob/main/operations/tempo-mixin/dashboards/tempo-operational.json#L4585
    is looking for the metric tempo_memberlist_client_kv_store_count. I couldn't find it exposed on any of the /metrics endpoints I looked at.

I went looking for where this metric is configured and found this: https://github.com/grafana/dskit/blob/main/kv/memberlist/metrics.go#L123

	m.storeValuesDesc = prometheus.NewDesc(
		prometheus.BuildFQName(m.cfg.MetricsNamespace, subsystem, "kv_store_count"), // gauge
		"Number of values in KV Store",
		nil, nil)

I'm not super familiar with Prometheus, but the rest of the metric definitions in that file have NewGaugeVec or NewCounterVec. From what I can tell NewDesc is used for metric metadata?
But the metric name "kv_store_count" sounds like it should be a numerical value?

  1. gauge_memberlist_health_score
    This chart https://github.com/grafana/tempo/blob/main/operations/tempo-mixin/dashboards/tempo-operational.json#LL4319C1-L4319C1 is looking for a metric gauge_memberlist_health_score. Again I couldn't find it exposed in the /metrics endpoint.

I went looking for where this metric is configured and I couldn't find it!

But I did find this: https://github.com/grafana/dskit/blob/main/kv/memberlist/metrics.go#L154

	m.memberlistHealthScore = promauto.With(m.registerer).NewGaugeFunc(prometheus.GaugeOpts{
		Namespace: m.cfg.MetricsNamespace,
		Subsystem: subsystem,
		Name:      "cluster_node_health_score",
		Help:      "Health score of this cluster. Lower value is better. 0 = healthy",
	}, func() float64 {
		// m.memberlist is not set before Starting state
		if m.State() == services.Running || m.State() == services.Stopping {
			return float64(m.memberlist.GetHealthScore())
		}
		return 0
	})

The Go struct member is memberlistHealthScore similar to the metric I'm looking for, but the actual metric name is cluster_node_health_score.

Could this be a metric that has been changed in code, but the metric name hasn't been updated yet?

Also I think the Tempo mixin might be wrong... gauge_memberlist_health_score seems like it should be tempo_memberlist_client_cluster_node_health_score or something?

Import `github.com/weaveworks/common/user`

How about importing the user lib from Weaveworks so that we can mange user IDs in the context better. E.g. it would be helpful to add user structs instead of ID strings.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.