Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion pkg/ring/kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cortexproject/cortex/pkg/util/backoff"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"
utiltimer "github.com/cortexproject/cortex/pkg/util/timer"
)

const (
Expand Down Expand Up @@ -830,6 +831,8 @@ func (m *KV) notifyWatchers(key string) {
// After too many failed retries, this method returns error.
func (m *KV) CAS(ctx context.Context, key string, codec codec.Codec, f func(in any) (out any, retry bool, err error)) error {
var lastError error
var retryTimer *time.Timer
defer utiltimer.StopAndDrainTimer(retryTimer)

outer:
for retries := m.maxCasRetries; retries > 0; retries-- {
Expand All @@ -839,9 +842,14 @@ outer:
// We only get here, if 'f' reports some change, but Merge function reports no change. This can happen
// with Ring's merge function, which depends on timestamps (and not the tokens) with 1-second resolution.
// By waiting for one second, we hope that Merge will be able to detect change from 'f' function.
if retryTimer == nil {
retryTimer = time.NewTimer(noChangeDetectedRetrySleep)
} else {
utiltimer.ResetTimer(retryTimer, noChangeDetectedRetrySleep)
}

select {
case <-time.After(noChangeDetectedRetrySleep):
case <-retryTimer.C:
// ok
case <-ctx.Done():
lastError = ctx.Err()
Expand Down
52 changes: 52 additions & 0 deletions pkg/ring/kv/memberlist/memberlist_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1355,3 +1355,55 @@ func (p *dnsProviderMock) Resolve(ctx context.Context, addrs []string, flushOld
func (p dnsProviderMock) Addresses() []string {
return p.resolved
}

func BenchmarkCASTimerAllocation(b *testing.B) {
c := dataCodec{}

var cfg KVConfig
flagext.DefaultValues(&cfg)
cfg.TCPTransport = TCPTransportConfig{}
cfg.Codecs = []codec.Codec{c}

mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry())
// Reduce max retries for faster benchmark
mkv.maxCasRetries = 3

require.NoError(b, services.StartAndAwaitRunning(context.Background(), mkv))
defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck

kv, err := NewClient(mkv, c)
require.NoError(b, err)

// Set up initial data
err = kv.CAS(context.Background(), "bench", func(in any) (out any, retry bool, err error) {
d := &data{Members: map[string]member{}}
d.Members["test"] = member{
Timestamp: time.Now().Unix(),
State: JOINING,
}
return d, true, nil
})
require.NoError(b, err)

b.ReportAllocs()

for b.Loop() {
retryCount := 0
// This will trigger 2 retries (total 3 attempts) before succeeding
_ = kv.CAS(context.Background(), "bench", func(in any) (out any, retry bool, err error) {
d := in.(*data)
retryCount++

// Trigger retries by returning same data
if retryCount < 3 {
return d, true, nil
}

// Make change on 3rd attempt
m := d.Members["test"]
m.Timestamp = time.Now().Unix()
d.Members["test"] = m
return d, true, nil
})
}
}
Loading