diff --git a/pkg/ring/kv/memberlist/memberlist_client.go b/pkg/ring/kv/memberlist/memberlist_client.go index 206157c284a..48b5bd9266f 100644 --- a/pkg/ring/kv/memberlist/memberlist_client.go +++ b/pkg/ring/kv/memberlist/memberlist_client.go @@ -23,6 +23,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/backoff" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" + utiltimer "github.com/cortexproject/cortex/pkg/util/timer" ) const ( @@ -830,6 +831,8 @@ func (m *KV) notifyWatchers(key string) { // After too many failed retries, this method returns error. func (m *KV) CAS(ctx context.Context, key string, codec codec.Codec, f func(in any) (out any, retry bool, err error)) error { var lastError error + var retryTimer *time.Timer + defer utiltimer.StopAndDrainTimer(retryTimer) outer: for retries := m.maxCasRetries; retries > 0; retries-- { @@ -839,9 +842,14 @@ outer: // We only get here, if 'f' reports some change, but Merge function reports no change. This can happen // with Ring's merge function, which depends on timestamps (and not the tokens) with 1-second resolution. // By waiting for one second, we hope that Merge will be able to detect change from 'f' function. + if retryTimer == nil { + retryTimer = time.NewTimer(noChangeDetectedRetrySleep) + } else { + utiltimer.ResetTimer(retryTimer, noChangeDetectedRetrySleep) + } select { - case <-time.After(noChangeDetectedRetrySleep): + case <-retryTimer.C: // ok case <-ctx.Done(): lastError = ctx.Err() diff --git a/pkg/ring/kv/memberlist/memberlist_client_test.go b/pkg/ring/kv/memberlist/memberlist_client_test.go index ed093670d10..2ac167665a4 100644 --- a/pkg/ring/kv/memberlist/memberlist_client_test.go +++ b/pkg/ring/kv/memberlist/memberlist_client_test.go @@ -1355,3 +1355,55 @@ func (p *dnsProviderMock) Resolve(ctx context.Context, addrs []string, flushOld func (p dnsProviderMock) Addresses() []string { return p.resolved } + +func BenchmarkCASTimerAllocation(b *testing.B) { + c := dataCodec{} + + var cfg KVConfig + flagext.DefaultValues(&cfg) + cfg.TCPTransport = TCPTransportConfig{} + cfg.Codecs = []codec.Codec{c} + + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + // Reduce max retries for faster benchmark + mkv.maxCasRetries = 3 + + require.NoError(b, services.StartAndAwaitRunning(context.Background(), mkv)) + defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck + + kv, err := NewClient(mkv, c) + require.NoError(b, err) + + // Set up initial data + err = kv.CAS(context.Background(), "bench", func(in any) (out any, retry bool, err error) { + d := &data{Members: map[string]member{}} + d.Members["test"] = member{ + Timestamp: time.Now().Unix(), + State: JOINING, + } + return d, true, nil + }) + require.NoError(b, err) + + b.ReportAllocs() + + for b.Loop() { + retryCount := 0 + // This will trigger 2 retries (total 3 attempts) before succeeding + _ = kv.CAS(context.Background(), "bench", func(in any) (out any, retry bool, err error) { + d := in.(*data) + retryCount++ + + // Trigger retries by returning same data + if retryCount < 3 { + return d, true, nil + } + + // Make change on 3rd attempt + m := d.Members["test"] + m.Timestamp = time.Now().Unix() + d.Members["test"] = m + return d, true, nil + }) + } +}