diff --git a/CHANGELOG.md b/CHANGELOG.md index 75425534bf..fbdff084fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ * Metrics: Renamed `cortex_parquet_queryable_cache_*` to `cortex_parquet_cache_*`. * Flags: Renamed `-querier.parquet-queryable-shard-cache-size` to `-querier.parquet-shard-cache-size` and `-querier.parquet-queryable-shard-cache-ttl` to `-querier.parquet-shard-cache-ttl`. * Config: Renamed `parquet_queryable_shard_cache_size` to `parquet_shard_cache_size` and `parquet_queryable_shard_cache_ttl` to `parquet_shard_cache_ttl`. +* [FEATURE] HATracker: Add experimental support for memberlist as a KV store backend. #7284 * [FEATURE] StoreGateway: Introduces a new parquet mode. #7046 * [FEATURE] StoreGateway: Add a parquet shard cache to parquet mode. #7166 * [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077 diff --git a/docs/configuration/arguments.md b/docs/configuration/arguments.md index 9ba9f31710..fa32ba010e 100644 --- a/docs/configuration/arguments.md +++ b/docs/configuration/arguments.md @@ -117,7 +117,7 @@ The next three options only apply when the querier is used together with the Que ### Ring/HA Tracker Store -The KVStore client is used by both the Ring and HA Tracker (HA Tracker doesn't support memberlist as KV store). +The KVStore client is used by both the Ring and HA Tracker (HA Tracker supports memberlist as a KV store as an experimental feature). - `{ring,distributor.ha-tracker}.prefix` The prefix for the keys in the store. Should end with a /. For example with a prefix of foo/, the key bar would be stored under foo/bar. - `{ring,distributor.ha-tracker}.store` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 81b85fb018..c47eede8dc 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3103,9 +3103,8 @@ ha_tracker: # CLI flag: -distributor.ha-tracker.enable-startup-sync [enable_startup_sync: | default = false] - # Backend storage to use for the ring. Please be aware that memberlist is not - # supported by the HA tracker since gossip propagation is too slow for HA - # purposes. + # Backend storage to use for the ring. Memberlist support in the HA tracker is + # experimental, as gossip propagation delays may impact HA performance. kvstore: # Backend storage to use for the ring. Supported values are: consul, # dynamodb, etcd, inmemory, memberlist, multi. diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index fbc6db605f..4fe4502d89 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -37,6 +37,7 @@ import ( "github.com/cortexproject/cortex/pkg/flusher" "github.com/cortexproject/cortex/pkg/frontend" "github.com/cortexproject/cortex/pkg/frontend/transport" + "github.com/cortexproject/cortex/pkg/ha" "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/overrides" "github.com/cortexproject/cortex/pkg/parquetconverter" @@ -821,6 +822,7 @@ func (t *Cortex) initMemberlistKV() (services.Service, error) { t.Cfg.MemberlistKV.MetricsRegisterer = reg t.Cfg.MemberlistKV.Codecs = []codec.Codec{ ring.GetCodec(), + ha.GetReplicaDescCodec(), } dnsProviderReg := prometheus.WrapRegistererWithPrefix( "cortex_", @@ -835,6 +837,7 @@ func (t *Cortex) initMemberlistKV() (services.Service, error) { // Update the config. t.Cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + t.Cfg.Distributor.HATrackerConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.StoreGateway.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.Compactor.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV diff --git a/pkg/ha/ha_tracker.go b/pkg/ha/ha_tracker.go index 11b38eb79c..3493c82134 100644 --- a/pkg/ha/ha_tracker.go +++ b/pkg/ha/ha_tracker.go @@ -21,6 +21,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/ring/kv/codec" + "github.com/cortexproject/cortex/pkg/ring/kv/memberlist" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -72,7 +73,7 @@ type HATrackerConfig struct { // of tracked keys is large. EnableStartupSync bool `yaml:"enable_startup_sync"` - KVStore kv.Config `yaml:"kvstore" doc:"description=Backend storage to use for the ring. Please be aware that memberlist is not supported by the HA tracker since gossip propagation is too slow for HA purposes."` + KVStore kv.Config `yaml:"kvstore" doc:"description=Backend storage to use for the ring. Memberlist support in the HA tracker is experimental, as gossip propagation delays may impact HA performance."` } // RegisterFlags adds the flags required to config this to the given FlagSet with a specified prefix @@ -80,6 +81,85 @@ func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet) { cfg.RegisterFlagsWithPrefix("", "", f) } +func (d *ReplicaDesc) Clone() any { + return proto.Clone(d) +} + +// Merge merges other ReplicaDesc into this one and can be sent out to other clients. +// This merge function depends on the timestamp of the replica. It will choose more recent state +// from the two descriptors based on ReceivedAt timestamp. +func (d *ReplicaDesc) Merge(mergeable memberlist.Mergeable, _ bool) (memberlist.Mergeable, error) { + if mergeable == nil { + return nil, nil + } + + other, ok := mergeable.(*ReplicaDesc) + if !ok { + return nil, fmt.Errorf("expected *ha.ReplicaDesc, got %T", mergeable) + } + + if other == nil { + return nil, nil + } + + // If other has been deleted, always take the deletion + if other.DeletedAt > 0 { + if d.DeletedAt == 0 || other.DeletedAt > d.DeletedAt { + d.Replica = other.Replica + d.ReceivedAt = other.ReceivedAt + d.DeletedAt = other.DeletedAt + return proto.Clone(d).(*ReplicaDesc), nil + } + return nil, nil + } + + // If this descriptor is deleted but other isn't, and other is more recent, take the other + if d.DeletedAt > 0 && other.DeletedAt == 0 && other.ReceivedAt > d.ReceivedAt { + d.Replica = other.Replica + d.ReceivedAt = other.ReceivedAt + d.DeletedAt = other.DeletedAt + return proto.Clone(d).(*ReplicaDesc), nil + } + + // Choose the descriptor with the more recent timestamp + if other.ReceivedAt > d.ReceivedAt { + d.Replica = other.Replica + d.ReceivedAt = other.ReceivedAt + d.DeletedAt = other.DeletedAt + return proto.Clone(d).(*ReplicaDesc), nil + } + + // If timestamps are exactly equal but replicas differ, use lexicographic ordering + if other.ReceivedAt == d.ReceivedAt && other.Replica != d.Replica { + // Choose the lexicographically smaller replica + if other.Replica < d.Replica { + d.Replica = other.Replica + d.ReceivedAt = other.ReceivedAt + d.DeletedAt = other.DeletedAt + return proto.Clone(d).(*ReplicaDesc), nil + } + return nil, nil + } + + // No change (same timestamp, same replica) + return nil, nil +} + +// MergeContent describes content of this Mergeable. +// For ReplicaDesc, we return the replica name. +func (d *ReplicaDesc) MergeContent() []string { + if d.Replica == "" { + return nil + } + return []string{d.Replica} +} + +// RemoveTombstones is a no-op for ReplicaDesc. +func (d *ReplicaDesc) RemoveTombstones(_ time.Time) (total, removed int) { + // No-op: HATracker manages tombstones via cleanupOldReplicas + return +} + // RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *HATrackerConfig) RegisterFlagsWithPrefix(flagPrefix string, kvPrefix string, f *flag.FlagSet) { finalFlagPrefix := "" @@ -116,12 +196,13 @@ func (cfg *HATrackerConfig) Validate() error { return fmt.Errorf(errInvalidFailoverTimeout, cfg.FailoverTimeout, minFailureTimeout) } - // Tracker kv store only supports consul and etcd. - storeAllowedList := []string{"consul", "etcd"} - if slices.Contains(storeAllowedList, cfg.KVStore.Store) { - return nil + // Tracker kv store only supports consul, etcd, memberlist, and multi. + storeAllowedList := []string{"consul", "etcd", "memberlist", "multi"} + if !slices.Contains(storeAllowedList, cfg.KVStore.Store) { + return fmt.Errorf("invalid HATracker KV store type: %s", cfg.KVStore.Store) } - return fmt.Errorf("invalid HATracker KV store type: %s", cfg.KVStore.Store) + + return nil } func GetReplicaDescCodec() codec.Proto { diff --git a/pkg/ha/ha_tracker_test.go b/pkg/ha/ha_tracker_test.go index 882a4c8868..fac5f434f7 100644 --- a/pkg/ha/ha_tracker_test.go +++ b/pkg/ha/ha_tracker_test.go @@ -20,6 +20,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/ring/kv/consul" + "github.com/cortexproject/cortex/pkg/ring/kv/memberlist" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" util_log "github.com/cortexproject/cortex/pkg/util/log" @@ -108,14 +109,23 @@ func TestHATrackerConfig_Validate(t *testing.T) { }(), expectedErr: nil, }, - "should failed with invalid kv store": { + "should pass with memberlist kv store": { cfg: func() HATrackerConfig { cfg := HATrackerConfig{} flagext.DefaultValues(&cfg) cfg.KVStore.Store = "memberlist" return cfg }(), - expectedErr: fmt.Errorf("invalid HATracker KV store type: %s", "memberlist"), + expectedErr: nil, + }, + "should pass with multi kv store": { + cfg: func() HATrackerConfig { + cfg := HATrackerConfig{} + flagext.DefaultValues(&cfg) + cfg.KVStore.Store = "multi" + return cfg + }(), + expectedErr: nil, }, } @@ -945,3 +955,204 @@ func checkReplicaDeletionState(t *testing.T, duration time.Duration, c *HATracke require.Equal(t, expectedMarkedForDeletion, markedForDeletion, "KV entry marked for deletion") } } + +func TestReplicaDesc_Merge(t *testing.T) { + now := time.Now() + + tests := []struct { + name string + current *ReplicaDesc + other *ReplicaDesc + expectChange bool + expectedResult *ReplicaDesc + }{ + { + name: "merge with more recent replica", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + other: &ReplicaDesc{ + Replica: "replica2", + ReceivedAt: timestamp.FromTime(now.Add(time.Minute)), + DeletedAt: 0, + }, + expectChange: true, + expectedResult: &ReplicaDesc{ + Replica: "replica2", + ReceivedAt: timestamp.FromTime(now.Add(time.Minute)), + DeletedAt: 0, + }, + }, + { + name: "merge with older replica - no change", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now.Add(time.Minute)), + DeletedAt: 0, + }, + other: &ReplicaDesc{ + Replica: "replica2", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + expectChange: false, + expectedResult: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now.Add(time.Minute)), + DeletedAt: 0, + }, + }, + { + name: "merge with deleted replica", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + other: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now.Add(time.Minute)), + DeletedAt: timestamp.FromTime(now.Add(2 * time.Minute)), + }, + expectChange: true, + expectedResult: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now.Add(time.Minute)), + DeletedAt: timestamp.FromTime(now.Add(2 * time.Minute)), + }, + }, + { + name: "undelete with more recent replica", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: timestamp.FromTime(now.Add(time.Minute)), + }, + other: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now.Add(2 * time.Minute)), + DeletedAt: 0, + }, + expectChange: true, + expectedResult: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now.Add(2 * time.Minute)), + DeletedAt: 0, + }, + }, + { + name: "merge with nil other", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + other: nil, + expectChange: false, + expectedResult: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + }, + { + name: "merge deleted with more recent deleted", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: timestamp.FromTime(now.Add(time.Minute)), + }, + other: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: timestamp.FromTime(now.Add(2 * time.Minute)), + }, + expectChange: true, + expectedResult: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: timestamp.FromTime(now.Add(2 * time.Minute)), + }, + }, + { + name: "same timestamp, different replica - choose lexicographically smaller", + current: &ReplicaDesc{ + Replica: "replica-b", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + other: &ReplicaDesc{ + Replica: "replica-a", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + expectChange: true, + expectedResult: &ReplicaDesc{ + Replica: "replica-a", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + }, + { + name: "same timestamp, same replica - no change", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + other: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + expectChange: false, + expectedResult: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var change memberlist.Mergeable + var err error + + if tt.other != nil { + change, err = tt.current.Merge(tt.other, false) + } else { + change, err = tt.current.Merge(nil, false) + } + + require.NoError(t, err) + + if tt.expectChange { + require.NotNil(t, change, "expected a change to be returned") + } else { + require.Nil(t, change, "expected no change to be returned") + } + + assert.Equal(t, tt.expectedResult.Replica, tt.current.Replica) + assert.Equal(t, tt.expectedResult.ReceivedAt, tt.current.ReceivedAt) + assert.Equal(t, tt.expectedResult.DeletedAt, tt.current.DeletedAt) + }) + } +} + +func TestReplicaDesc_MergeContent(t *testing.T) { + desc := &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(time.Now()), + DeletedAt: 0, + } + + content := desc.MergeContent() + require.Equal(t, []string{"replica1"}, content) + + emptyDesc := &ReplicaDesc{} + emptyContent := emptyDesc.MergeContent() + require.Nil(t, emptyContent) +} diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index dfbd85f685..378fc91694 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -3750,7 +3750,7 @@ "x-format": "duration" }, "kvstore": { - "description": "Backend storage to use for the ring. Please be aware that memberlist is not supported by the HA tracker since gossip propagation is too slow for HA purposes.", + "description": "Backend storage to use for the ring. Memberlist support in the HA tracker is experimental, as gossip propagation delays may impact HA performance.", "properties": { "consul": { "$ref": "#/definitions/consul_config"