From cd533411169f81edc76d7c80a3b93027d630946c Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Thu, 19 Feb 2026 21:25:25 +0900 Subject: [PATCH 1/6] Enable HATracker memberlist Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + docs/configuration/arguments.md | 2 +- docs/configuration/config-file-reference.md | 6 +- pkg/cortex/modules.go | 3 + pkg/ha/ha_tracker.go | 90 +++++++- pkg/ha/ha_tracker_test.go | 215 +++++++++++++++++++- schemas/cortex-config-schema.json | 2 +- 7 files changed, 304 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f7c526927d..0f7b17cacc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ * Metrics: Renamed `cortex_parquet_queryable_cache_*` to `cortex_parquet_cache_*`. * Flags: Renamed `-querier.parquet-queryable-shard-cache-size` to `-querier.parquet-shard-cache-size` and `-querier.parquet-queryable-shard-cache-ttl` to `-querier.parquet-shard-cache-ttl`. * Config: Renamed `parquet_queryable_shard_cache_size` to `parquet_shard_cache_size` and `parquet_queryable_shard_cache_ttl` to `parquet_shard_cache_ttl`. +* [FEATURE] HATracker: Add experimental support for memberlist as a KV store backend. * [FEATURE] StoreGateway: Introduces a new parquet mode. #7046 * [FEATURE] StoreGateway: Add a parquet shard cache to parquet mode. #7166 * [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077 diff --git a/docs/configuration/arguments.md b/docs/configuration/arguments.md index 9ba9f31710..fa32ba010e 100644 --- a/docs/configuration/arguments.md +++ b/docs/configuration/arguments.md @@ -117,7 +117,7 @@ The next three options only apply when the querier is used together with the Que ### Ring/HA Tracker Store -The KVStore client is used by both the Ring and HA Tracker (HA Tracker doesn't support memberlist as KV store). +The KVStore client is used by both the Ring and HA Tracker (HA Tracker supports memberlist as a KV store as an experimental feature). - `{ring,distributor.ha-tracker}.prefix` The prefix for the keys in the store. Should end with a /. For example with a prefix of foo/, the key bar would be stored under foo/bar. - `{ring,distributor.ha-tracker}.store` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 81b85fb018..bb0994e34e 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3103,9 +3103,9 @@ ha_tracker: # CLI flag: -distributor.ha-tracker.enable-startup-sync [enable_startup_sync: | default = false] - # Backend storage to use for the ring. Please be aware that memberlist is not - # supported by the HA tracker since gossip propagation is too slow for HA - # purposes. + # Backend storage to use for the ring. Please be aware that memberlist support + # in the HA tracker is currently experimental since gossip propagation is too + # slow for HA purposes. kvstore: # Backend storage to use for the ring. Supported values are: consul, # dynamodb, etcd, inmemory, memberlist, multi. diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index fbc6db605f..4fe4502d89 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -37,6 +37,7 @@ import ( "github.com/cortexproject/cortex/pkg/flusher" "github.com/cortexproject/cortex/pkg/frontend" "github.com/cortexproject/cortex/pkg/frontend/transport" + "github.com/cortexproject/cortex/pkg/ha" "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/overrides" "github.com/cortexproject/cortex/pkg/parquetconverter" @@ -821,6 +822,7 @@ func (t *Cortex) initMemberlistKV() (services.Service, error) { t.Cfg.MemberlistKV.MetricsRegisterer = reg t.Cfg.MemberlistKV.Codecs = []codec.Codec{ ring.GetCodec(), + ha.GetReplicaDescCodec(), } dnsProviderReg := prometheus.WrapRegistererWithPrefix( "cortex_", @@ -835,6 +837,7 @@ func (t *Cortex) initMemberlistKV() (services.Service, error) { // Update the config. t.Cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + t.Cfg.Distributor.HATrackerConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.StoreGateway.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.Compactor.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV diff --git a/pkg/ha/ha_tracker.go b/pkg/ha/ha_tracker.go index 11b38eb79c..5358cb6cd3 100644 --- a/pkg/ha/ha_tracker.go +++ b/pkg/ha/ha_tracker.go @@ -7,7 +7,6 @@ import ( "fmt" "maps" "math/rand" - "slices" "strings" "sync" "time" @@ -21,6 +20,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/ring/kv/codec" + "github.com/cortexproject/cortex/pkg/ring/kv/memberlist" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -72,7 +72,7 @@ type HATrackerConfig struct { // of tracked keys is large. EnableStartupSync bool `yaml:"enable_startup_sync"` - KVStore kv.Config `yaml:"kvstore" doc:"description=Backend storage to use for the ring. Please be aware that memberlist is not supported by the HA tracker since gossip propagation is too slow for HA purposes."` + KVStore kv.Config `yaml:"kvstore" doc:"description=Backend storage to use for the ring. Please be aware that memberlist support in the HA tracker is currently experimental since gossip propagation is too slow for HA purposes."` } // RegisterFlags adds the flags required to config this to the given FlagSet with a specified prefix @@ -80,6 +80,85 @@ func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet) { cfg.RegisterFlagsWithPrefix("", "", f) } +func (d *ReplicaDesc) Clone() interface{} { + return proto.Clone(d) +} + +// Merge merges other ReplicaDesc into this one and can be sent out to other clients. +// This merge function depends on the timestamp of the replica. It will choose more recent state +// from the two descriptors based on ReceivedAt timestamp. +func (d *ReplicaDesc) Merge(mergeable memberlist.Mergeable, _ bool) (memberlist.Mergeable, error) { + if mergeable == nil { + return nil, nil + } + + other, ok := mergeable.(*ReplicaDesc) + if !ok { + return nil, fmt.Errorf("expected *ha.ReplicaDesc, got %T", mergeable) + } + + if other == nil { + return nil, nil + } + + // If other has been deleted, always take the deletion + if other.DeletedAt > 0 { + if d.DeletedAt == 0 || other.DeletedAt > d.DeletedAt { + d.Replica = other.Replica + d.ReceivedAt = other.ReceivedAt + d.DeletedAt = other.DeletedAt + return proto.Clone(d).(*ReplicaDesc), nil + } + return nil, nil + } + + // If this descriptor is deleted but other isn't, and other is more recent, take the other + if d.DeletedAt > 0 && other.DeletedAt == 0 && other.ReceivedAt > d.ReceivedAt { + d.Replica = other.Replica + d.ReceivedAt = other.ReceivedAt + d.DeletedAt = other.DeletedAt + return proto.Clone(d).(*ReplicaDesc), nil + } + + // Choose the descriptor with the more recent timestamp + if other.ReceivedAt > d.ReceivedAt { + d.Replica = other.Replica + d.ReceivedAt = other.ReceivedAt + d.DeletedAt = other.DeletedAt + return proto.Clone(d).(*ReplicaDesc), nil + } + + // If timestamps are exactly equal but replicas differ, use lexicographic ordering + if other.ReceivedAt == d.ReceivedAt && other.Replica != d.Replica { + // Choose the lexicographically smaller replica + if other.Replica < d.Replica { + d.Replica = other.Replica + d.ReceivedAt = other.ReceivedAt + d.DeletedAt = other.DeletedAt + return proto.Clone(d).(*ReplicaDesc), nil + } + return nil, nil + } + + // No change (same timestamp, same replica) + return nil, nil +} + +// MergeContent describes content of this Mergeable. +// For ReplicaDesc, we return the replica name. +func (d *ReplicaDesc) MergeContent() []string { + if d.Replica == "" { + return nil + } + return []string{d.Replica} +} + +// RemoveTombstones is a no-op for ReplicaDesc. +func (d *ReplicaDesc) RemoveTombstones(_ time.Time) (total, removed int) { + // No-op: HATracker manages tombstones via cleanupOldReplicas + return +} + // RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *HATrackerConfig) RegisterFlagsWithPrefix(flagPrefix string, kvPrefix string, f *flag.FlagSet) { finalFlagPrefix := "" @@ -116,12 +195,7 @@ func (cfg *HATrackerConfig) Validate() error { return fmt.Errorf(errInvalidFailoverTimeout, cfg.FailoverTimeout, minFailureTimeout) } - // Tracker kv store only supports consul and etcd. - storeAllowedList := []string{"consul", "etcd"} - if slices.Contains(storeAllowedList, cfg.KVStore.Store) { - return nil - } - return fmt.Errorf("invalid HATracker KV store type: %s", cfg.KVStore.Store) + return nil } func GetReplicaDescCodec() codec.Proto { diff --git a/pkg/ha/ha_tracker_test.go b/pkg/ha/ha_tracker_test.go index 882a4c8868..fac5f434f7 100644 --- a/pkg/ha/ha_tracker_test.go +++ b/pkg/ha/ha_tracker_test.go @@ -20,6 +20,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/ring/kv/consul" + "github.com/cortexproject/cortex/pkg/ring/kv/memberlist" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" util_log "github.com/cortexproject/cortex/pkg/util/log" @@ -108,14 +109,23 @@ func TestHATrackerConfig_Validate(t *testing.T) { }(), expectedErr: nil, }, - "should failed with invalid kv store": { + "should pass with memberlist kv store": { cfg: func() HATrackerConfig { cfg := HATrackerConfig{} flagext.DefaultValues(&cfg) cfg.KVStore.Store = "memberlist" return cfg }(), - expectedErr: fmt.Errorf("invalid HATracker KV store type: %s", "memberlist"), + expectedErr: nil, + }, + "should pass with multi kv store": { + cfg: func() HATrackerConfig { + cfg := HATrackerConfig{} + flagext.DefaultValues(&cfg) + cfg.KVStore.Store = "multi" + return cfg + }(), + expectedErr: nil, }, } @@ -945,3 +955,204 @@ func checkReplicaDeletionState(t *testing.T, duration time.Duration, c *HATracke require.Equal(t, expectedMarkedForDeletion, markedForDeletion, "KV entry marked for deletion") } } + +func TestReplicaDesc_Merge(t *testing.T) { + now := time.Now() + + tests := []struct { + name string + current *ReplicaDesc + other *ReplicaDesc + expectChange bool + expectedResult *ReplicaDesc + }{ + { + name: "merge with more recent replica", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + other: &ReplicaDesc{ + Replica: "replica2", + ReceivedAt: timestamp.FromTime(now.Add(time.Minute)), + DeletedAt: 0, + }, + expectChange: true, + expectedResult: &ReplicaDesc{ + Replica: "replica2", + ReceivedAt: timestamp.FromTime(now.Add(time.Minute)), + DeletedAt: 0, + }, + }, + { + name: "merge with older replica - no change", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now.Add(time.Minute)), + DeletedAt: 0, + }, + other: &ReplicaDesc{ + Replica: "replica2", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + expectChange: false, + expectedResult: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now.Add(time.Minute)), + DeletedAt: 0, + }, + }, + { + name: "merge with deleted replica", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + other: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now.Add(time.Minute)), + DeletedAt: timestamp.FromTime(now.Add(2 * time.Minute)), + }, + expectChange: true, + expectedResult: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now.Add(time.Minute)), + DeletedAt: timestamp.FromTime(now.Add(2 * time.Minute)), + }, + }, + { + name: "undelete with more recent replica", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: timestamp.FromTime(now.Add(time.Minute)), + }, + other: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now.Add(2 * time.Minute)), + DeletedAt: 0, + }, + expectChange: true, + expectedResult: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now.Add(2 * time.Minute)), + DeletedAt: 0, + }, + }, + { + name: "merge with nil other", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + other: nil, + expectChange: false, + expectedResult: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + }, + { + name: "merge deleted with more recent deleted", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: timestamp.FromTime(now.Add(time.Minute)), + }, + other: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: timestamp.FromTime(now.Add(2 * time.Minute)), + }, + expectChange: true, + expectedResult: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: timestamp.FromTime(now.Add(2 * time.Minute)), + }, + }, + { + name: "same timestamp, different replica - choose lexicographically smaller", + current: &ReplicaDesc{ + Replica: "replica-b", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + other: &ReplicaDesc{ + Replica: "replica-a", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + expectChange: true, + expectedResult: &ReplicaDesc{ + Replica: "replica-a", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + }, + { + name: "same timestamp, same replica - no change", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + other: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + expectChange: false, + expectedResult: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var change memberlist.Mergeable + var err error + + if tt.other != nil { + change, err = tt.current.Merge(tt.other, false) + } else { + change, err = tt.current.Merge(nil, false) + } + + require.NoError(t, err) + + if tt.expectChange { + require.NotNil(t, change, "expected a change to be returned") + } else { + require.Nil(t, change, "expected no change to be returned") + } + + assert.Equal(t, tt.expectedResult.Replica, tt.current.Replica) + assert.Equal(t, tt.expectedResult.ReceivedAt, tt.current.ReceivedAt) + assert.Equal(t, tt.expectedResult.DeletedAt, tt.current.DeletedAt) + }) + } +} + +func TestReplicaDesc_MergeContent(t *testing.T) { + desc := &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(time.Now()), + DeletedAt: 0, + } + + content := desc.MergeContent() + require.Equal(t, []string{"replica1"}, content) + + emptyDesc := &ReplicaDesc{} + emptyContent := emptyDesc.MergeContent() + require.Nil(t, emptyContent) +} diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index dfbd85f685..8ef9c3bfbb 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -3750,7 +3750,7 @@ "x-format": "duration" }, "kvstore": { - "description": "Backend storage to use for the ring. Please be aware that memberlist is not supported by the HA tracker since gossip propagation is too slow for HA purposes.", + "description": "Backend storage to use for the ring. Please be aware that memberlist support in the HA tracker is currently experimental since gossip propagation is too slow for HA purposes.", "properties": { "consul": { "$ref": "#/definitions/consul_config" From 44573bf75feb1d013b91e5b486d658caa213e528 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Fri, 20 Feb 2026 11:31:16 +0900 Subject: [PATCH 2/6] changelog Signed-off-by: SungJin1212 --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f7b17cacc..91b64a10fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ * Metrics: Renamed `cortex_parquet_queryable_cache_*` to `cortex_parquet_cache_*`. * Flags: Renamed `-querier.parquet-queryable-shard-cache-size` to `-querier.parquet-shard-cache-size` and `-querier.parquet-queryable-shard-cache-ttl` to `-querier.parquet-shard-cache-ttl`. * Config: Renamed `parquet_queryable_shard_cache_size` to `parquet_shard_cache_size` and `parquet_queryable_shard_cache_ttl` to `parquet_shard_cache_ttl`. -* [FEATURE] HATracker: Add experimental support for memberlist as a KV store backend. +* [FEATURE] HATracker: Add experimental support for memberlist as a KV store backend. #7284 * [FEATURE] StoreGateway: Introduces a new parquet mode. #7046 * [FEATURE] StoreGateway: Add a parquet shard cache to parquet mode. #7166 * [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077 From 8d418a47ff458ffb3708b7f684700c1edc3f36c0 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Fri, 20 Feb 2026 11:49:26 +0900 Subject: [PATCH 3/6] lint Signed-off-by: SungJin1212 --- pkg/ha/ha_tracker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ha/ha_tracker.go b/pkg/ha/ha_tracker.go index 5358cb6cd3..457498d893 100644 --- a/pkg/ha/ha_tracker.go +++ b/pkg/ha/ha_tracker.go @@ -80,7 +80,7 @@ func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet) { cfg.RegisterFlagsWithPrefix("", "", f) } -func (d *ReplicaDesc) Clone() interface{} { +func (d *ReplicaDesc) Clone() any { return proto.Clone(d) } From aa4791ae6dbd23c1aa1ccd96455d1d945f9d4db1 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 23 Feb 2026 08:20:04 +0900 Subject: [PATCH 4/6] Update ha tracker KVStore doc Signed-off-by: SungJin1212 --- docs/configuration/config-file-reference.md | 5 ++--- pkg/ha/ha_tracker.go | 3 ++- schemas/cortex-config-schema.json | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index bb0994e34e..c47eede8dc 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3103,9 +3103,8 @@ ha_tracker: # CLI flag: -distributor.ha-tracker.enable-startup-sync [enable_startup_sync: | default = false] - # Backend storage to use for the ring. Please be aware that memberlist support - # in the HA tracker is currently experimental since gossip propagation is too - # slow for HA purposes. + # Backend storage to use for the ring. Memberlist support in the HA tracker is + # experimental, as gossip propagation delays may impact HA performance. kvstore: # Backend storage to use for the ring. Supported values are: consul, # dynamodb, etcd, inmemory, memberlist, multi. diff --git a/pkg/ha/ha_tracker.go b/pkg/ha/ha_tracker.go index 457498d893..99cf681ae8 100644 --- a/pkg/ha/ha_tracker.go +++ b/pkg/ha/ha_tracker.go @@ -7,6 +7,7 @@ import ( "fmt" "maps" "math/rand" + "slices" "strings" "sync" "time" @@ -72,7 +73,7 @@ type HATrackerConfig struct { // of tracked keys is large. EnableStartupSync bool `yaml:"enable_startup_sync"` - KVStore kv.Config `yaml:"kvstore" doc:"description=Backend storage to use for the ring. Please be aware that memberlist support in the HA tracker is currently experimental since gossip propagation is too slow for HA purposes."` + KVStore kv.Config `yaml:"kvstore" doc:"description=Backend storage to use for the ring. Memberlist support in the HA tracker is experimental, as gossip propagation delays may impact HA performance."` } // RegisterFlags adds the flags required to config this to the given FlagSet with a specified prefix diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 8ef9c3bfbb..378fc91694 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -3750,7 +3750,7 @@ "x-format": "duration" }, "kvstore": { - "description": "Backend storage to use for the ring. Please be aware that memberlist support in the HA tracker is currently experimental since gossip propagation is too slow for HA purposes.", + "description": "Backend storage to use for the ring. Memberlist support in the HA tracker is experimental, as gossip propagation delays may impact HA performance.", "properties": { "consul": { "$ref": "#/definitions/consul_config" From 269356e67d86fa44abb766f7a5f7de21f59a4a18 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 23 Feb 2026 08:21:29 +0900 Subject: [PATCH 5/6] add memberlist and multi to allow list Signed-off-by: SungJin1212 --- pkg/ha/ha_tracker.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/ha/ha_tracker.go b/pkg/ha/ha_tracker.go index 99cf681ae8..3493c82134 100644 --- a/pkg/ha/ha_tracker.go +++ b/pkg/ha/ha_tracker.go @@ -196,6 +196,12 @@ func (cfg *HATrackerConfig) Validate() error { return fmt.Errorf(errInvalidFailoverTimeout, cfg.FailoverTimeout, minFailureTimeout) } + // Tracker kv store only supports consul, etcd, memberlist, and multi. + storeAllowedList := []string{"consul", "etcd", "memberlist", "multi"} + if !slices.Contains(storeAllowedList, cfg.KVStore.Store) { + return fmt.Errorf("invalid HATracker KV store type: %s", cfg.KVStore.Store) + } + return nil } From b958c62b60775568829d62441bbd3a6e1ad64bc6 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Wed, 25 Feb 2026 14:03:20 +0900 Subject: [PATCH 6/6] Add e2e test Signed-off-by: SungJin1212 --- ...tegration_memberlist_single_binary_test.go | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/integration/integration_memberlist_single_binary_test.go b/integration/integration_memberlist_single_binary_test.go index a79642e1dd..a1c0e30f97 100644 --- a/integration/integration_memberlist_single_binary_test.go +++ b/integration/integration_memberlist_single_binary_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" @@ -246,3 +247,73 @@ func TestSingleBinaryWithMemberlistScaling(t *testing.T) { "expected all instances to have %f ring members and %f tombstones", expectedRingMembers, expectedTombstones) } + +func TestHATrackerWithMemberlist(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + minio := e2edb.NewMinio(9000, bucketName) + require.NoError(t, s.StartAndWaitReady(minio)) + + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-distributor.ha-tracker.enable": "true", + "-distributor.ha-tracker.enable-for-all-users": "true", + "-distributor.ha-tracker.cluster": "cluster", + "-distributor.ha-tracker.replica": "__replica__", + // Use memberlist as the KV store for the HA Tracker + "-distributor.ha-tracker.store": "memberlist", + + // To fast failover + "-distributor.ha-tracker.update-timeout": "1s", + "-distributor.ha-tracker.update-timeout-jitter-max": "0s", + "-distributor.ha-tracker.failover-timeout": "2s", + + // memberlist config + "-ring.store": "memberlist", + "-memberlist.bind-port": "8000", + }) + + cortex := newSingleBinary("cortex", "", "", flags) + require.NoError(t, s.StartAndWaitReady(cortex)) + + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(1), "memberlist_client_cluster_members_count")) + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + now := time.Now() + numUsers := 100 + + for i := 1; i <= numUsers; i++ { + userID := fmt.Sprintf("user-%d", i) + client, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), "", "", "", userID) + require.NoError(t, err) + + series, _ := generateSeries("foo", now, prompb.Label{Name: "__replica__", Value: "replica0"}, prompb.Label{Name: "cluster", Value: "cluster0"}) + res, err := client.Push([]prompb.TimeSeries{series[0]}) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + } + + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(numUsers)), "cortex_ha_tracker_elected_replica_changes_total")) + + // Wait 5 seconds to ensure the FailoverTimeout (2s) has comfortably passed. + time.Sleep(5 * time.Second) + + for i := 1; i <= numUsers; i++ { + userID := fmt.Sprintf("user-%d", i) + client, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), "", "", "", userID) + require.NoError(t, err) + + // This time, we send data from replica1 instead of replica0. + series, _ := generateSeries("foo", now.Add(time.Second*30), prompb.Label{Name: "__replica__", Value: "replica1"}, prompb.Label{Name: "cluster", Value: "cluster0"}) + res, err := client.Push([]prompb.TimeSeries{series[0]}) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + } + + // Since the leader successfully failed over to replica1, the change count increments by 1 per user + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(numUsers*2)), "cortex_ha_tracker_elected_replica_changes_total")) +}