From 02e7538546a63961764105e7b2eaef6f34a85927 Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Tue, 17 Feb 2026 14:26:37 -0800 Subject: [PATCH 1/3] Add store-gateway max series bytes before rejecting incoming requests to prevent oomkill Signed-off-by: Essam Eldaly --- pkg/storage/tsdb/config.go | 6 + pkg/storage/tsdb/config_test.go | 18 + pkg/storegateway/bucket_stores.go | 53 +- pkg/storegateway/bucket_stores_bench_test.go | 2 +- pkg/storegateway/bucket_stores_test.go | 28 +- pkg/storegateway/concurrent_bytes_tracker.go | 148 ++ .../concurrent_bytes_tracker_test.go | 1249 +++++++++++++++++ pkg/storegateway/gateway.go | 9 +- pkg/storegateway/limiter.go | 16 +- .../parquet_bucket_store_bench_test.go | 2 +- .../parquet_bucket_stores_test.go | 4 +- pkg/storegateway/tracking_bytes_limiter.go | 109 ++ .../tracking_bytes_limiter_test.go | 318 +++++ 13 files changed, 1923 insertions(+), 39 deletions(-) create mode 100644 pkg/storegateway/concurrent_bytes_tracker.go create mode 100644 pkg/storegateway/concurrent_bytes_tracker_test.go create mode 100644 pkg/storegateway/tracking_bytes_limiter.go create mode 100644 pkg/storegateway/tracking_bytes_limiter_test.go diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index fa6f7b1c938..4d52b223989 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -64,6 +64,7 @@ var ( ErrInvalidTokenBucketBytesLimiterMode = errors.New("invalid token bucket bytes limiter mode") ErrInvalidLazyExpandedPostingGroupMaxKeySeriesRatio = errors.New("lazy expanded posting group max key series ratio needs to be equal or greater than 0") ErrInvalidBucketStoreType = errors.New("invalid bucket store type") + ErrInvalidMaxConcurrentBytes = errors.New("max concurrent bytes must be non-negative") ) // BlocksStorageConfig holds the config information for the blocks storage. @@ -281,6 +282,7 @@ type BucketStoreConfig struct { SyncInterval time.Duration `yaml:"sync_interval"` MaxConcurrent int `yaml:"max_concurrent"` MaxInflightRequests int `yaml:"max_inflight_requests"` + MaxConcurrentBytes int64 `yaml:"max_concurrent_bytes"` TenantSyncConcurrency int `yaml:"tenant_sync_concurrency"` BlockSyncConcurrency int `yaml:"block_sync_concurrency"` MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` @@ -365,6 +367,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.ChunkPoolMaxBucketSizeBytes, "blocks-storage.bucket-store.chunk-pool-max-bucket-size-bytes", ChunkPoolDefaultMaxBucketSize, "Size - in bytes - of the largest chunks pool bucket.") f.IntVar(&cfg.MaxConcurrent, "blocks-storage.bucket-store.max-concurrent", 100, "Max number of concurrent queries to execute against the long-term storage. The limit is shared across all tenants.") f.IntVar(&cfg.MaxInflightRequests, "blocks-storage.bucket-store.max-inflight-requests", 0, "Max number of inflight queries to execute against the long-term storage. The limit is shared across all tenants. 0 to disable.") + f.Int64Var(&cfg.MaxConcurrentBytes, "blocks-storage.bucket-store.max-concurrent-bytes", 0, "Max number of bytes being processed concurrently across all queries. When the limit is reached, new requests are rejected with HTTP 503. 0 to disable.") f.IntVar(&cfg.TenantSyncConcurrency, "blocks-storage.bucket-store.tenant-sync-concurrency", 10, "Maximum number of concurrent tenants syncing blocks.") f.IntVar(&cfg.BlockSyncConcurrency, "blocks-storage.bucket-store.block-sync-concurrency", 20, "Maximum number of concurrent blocks syncing per tenant.") f.IntVar(&cfg.MetaSyncConcurrency, "blocks-storage.bucket-store.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from object storage per tenant.") @@ -429,6 +432,9 @@ func (cfg *BucketStoreConfig) Validate() error { if cfg.LazyExpandedPostingGroupMaxKeySeriesRatio < 0 { return ErrInvalidLazyExpandedPostingGroupMaxKeySeriesRatio } + if cfg.MaxConcurrentBytes < 0 { + return ErrInvalidMaxConcurrentBytes + } return nil } diff --git a/pkg/storage/tsdb/config_test.go b/pkg/storage/tsdb/config_test.go index 7a642cc6006..41ec872425f 100644 --- a/pkg/storage/tsdb/config_test.go +++ b/pkg/storage/tsdb/config_test.go @@ -145,6 +145,24 @@ func TestConfig_Validate(t *testing.T) { }, expectedErr: errUnSupportedWALCompressionType, }, + "should fail on negative max concurrent bytes": { + setup: func(cfg *BlocksStorageConfig) { + cfg.BucketStore.MaxConcurrentBytes = -1 + }, + expectedErr: ErrInvalidMaxConcurrentBytes, + }, + "should pass on zero max concurrent bytes (disabled)": { + setup: func(cfg *BlocksStorageConfig) { + cfg.BucketStore.MaxConcurrentBytes = 0 + }, + expectedErr: nil, + }, + "should pass on positive max concurrent bytes": { + setup: func(cfg *BlocksStorageConfig) { + cfg.BucketStore.MaxConcurrentBytes = 1024 * 1024 * 1024 // 1GB + }, + expectedErr: nil, + }, } for testName, testData := range tests { diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index f017457a9f3..6c8da2e558b 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -95,6 +95,13 @@ type ThanosBucketStores struct { // Keeps number of inflight requests inflightRequests *util.InflightRequestTracker + // Concurrent bytes tracker for limiting bytes being processed across all queries. + concurrentBytesTracker ConcurrentBytesTracker + + // Registry holder for tracking bytes limiters per request. + // This is used to ensure cleanup happens when requests complete. + trackingRegistryHolder *trackingLimiterRegistryHolder + // Metrics. syncTimes prometheus.Histogram syncLastSuccess prometheus.Gauge @@ -105,19 +112,20 @@ type ThanosBucketStores struct { var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many inflight requests in store gateway") // NewBucketStores makes a new BucketStores. -func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (BucketStores, error) { +// Note: concurrentBytesTracker is currently only used by the TSDB bucket store implementation. +func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer, concurrentBytesTracker ConcurrentBytesTracker) (BucketStores, error) { switch cfg.BucketStore.BucketStoreType { case string(tsdb.ParquetBucketStore): return newParquetBucketStores(cfg, bucketClient, limits, logger, reg) case string(tsdb.TSDBBucketStore): - return newThanosBucketStores(cfg, shardingStrategy, bucketClient, limits, logLevel, logger, reg) + return newThanosBucketStores(cfg, shardingStrategy, bucketClient, limits, logLevel, logger, reg, concurrentBytesTracker) default: return nil, fmt.Errorf("unsupported bucket store type: %s", cfg.BucketStore.BucketStoreType) } } // newThanosBucketStores creates a new TSDB-based bucket stores -func newThanosBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*ThanosBucketStores, error) { +func newThanosBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer, concurrentBytesTracker ConcurrentBytesTracker) (*ThanosBucketStores, error) { matchers := tsdb.NewMatchers() cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, tsdb.ParquetLabelsCacheConfig{}, matchers, bucketClient, logger, reg) if err != nil { @@ -133,20 +141,22 @@ func newThanosBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy Shardi }).Set(float64(cfg.BucketStore.MaxConcurrent)) u := &ThanosBucketStores{ - logger: logger, - cfg: cfg, - limits: limits, - bucket: cachingBucket, - shardingStrategy: shardingStrategy, - stores: map[string]*store.BucketStore{}, - storesErrors: map[string]error{}, - logLevel: logLevel, - bucketStoreMetrics: NewBucketStoreMetrics(), - metaFetcherMetrics: NewMetadataFetcherMetrics(), - queryGate: queryGate, - partitioner: newGapBasedPartitioner(cfg.BucketStore.PartitionerMaxGapBytes, reg), - userTokenBuckets: make(map[string]*util.TokenBucket), - inflightRequests: util.NewInflightRequestTracker(), + logger: logger, + cfg: cfg, + limits: limits, + bucket: cachingBucket, + shardingStrategy: shardingStrategy, + stores: map[string]*store.BucketStore{}, + storesErrors: map[string]error{}, + logLevel: logLevel, + bucketStoreMetrics: NewBucketStoreMetrics(), + metaFetcherMetrics: NewMetadataFetcherMetrics(), + queryGate: queryGate, + partitioner: newGapBasedPartitioner(cfg.BucketStore.PartitionerMaxGapBytes, reg), + userTokenBuckets: make(map[string]*util.TokenBucket), + inflightRequests: util.NewInflightRequestTracker(), + concurrentBytesTracker: concurrentBytesTracker, + trackingRegistryHolder: &trackingLimiterRegistryHolder{}, syncTimes: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Name: "cortex_bucket_stores_blocks_sync_seconds", Help: "The total time it takes to perform a sync stores", @@ -381,6 +391,13 @@ func (u *ThanosBucketStores) Series(req *storepb.SeriesRequest, srv storepb.Stor defer u.inflightRequests.Dec() } + registry := newTrackingBytesLimiterRegistry() + u.trackingRegistryHolder.SetRegistry(registry) + defer func() { + u.trackingRegistryHolder.ClearRegistry() + registry.ReleaseAll() + }() + err = store.Series(req, spanSeriesServer{ Store_SeriesServer: srv, ctx: spanCtx, @@ -697,7 +714,7 @@ func (u *ThanosBucketStores) getOrCreateStore(userID string) (*store.BucketStore u.syncDirForUser(userID), newChunksLimiterFactory(u.limits, userID), newSeriesLimiterFactory(u.limits, userID), - newBytesLimiterFactory(u.limits, userID, u.getUserTokenBucket(userID), u.instanceTokenBucket, u.cfg.BucketStore.TokenBucketBytesLimiter, u.getTokensToRetrieve), + newBytesLimiterFactory(u.limits, userID, u.getUserTokenBucket(userID), u.instanceTokenBucket, u.cfg.BucketStore.TokenBucketBytesLimiter, u.getTokensToRetrieve, u.concurrentBytesTracker, u.trackingRegistryHolder), u.partitioner, u.cfg.BucketStore.BlockSyncConcurrency, false, // No need to enable backward compatibility with Thanos pre 0.8.0 queriers diff --git a/pkg/storegateway/bucket_stores_bench_test.go b/pkg/storegateway/bucket_stores_bench_test.go index 2dd9c2207f7..4c8c0715042 100644 --- a/pkg/storegateway/bucket_stores_bench_test.go +++ b/pkg/storegateway/bucket_stores_bench_test.go @@ -48,7 +48,7 @@ func BenchmarkThanosBucketStores_SeriesBatch(b *testing.B) { require.NoError(b, err) reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucketClient), defaultLimitsOverrides(nil), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucketClient), defaultLimitsOverrides(nil), mockLoggingLevel(), log.NewNopLogger(), reg, nil) require.NoError(b, err) // Perform Initial Sync to load blocks diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index 4bf3e7af065..ccd6c27ffd3 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -125,7 +125,7 @@ func TestBucketStores_CustomerKeyError(t *testing.T) { require.NoError(t, err) reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), mBucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), mBucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil) require.NoError(t, err) if tc.mockInitialSync { @@ -208,7 +208,7 @@ func TestBucketStores_InitialSync(t *testing.T) { require.NoError(t, err) reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil) require.NoError(t, err) // Query series before the initial sync. @@ -285,7 +285,7 @@ func TestBucketStores_InitialSyncShouldRetryOnFailure(t *testing.T) { bucket = &failFirstGetBucket{Bucket: bucket} reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil) require.NoError(t, err) // Initial sync should succeed even if a transient error occurs. @@ -346,7 +346,7 @@ func TestBucketStores_SyncBlocks(t *testing.T) { require.NoError(t, err) reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil) require.NoError(t, err) // Run an initial sync to discover 1 block. @@ -446,7 +446,7 @@ func TestBucketStores_syncUsersBlocks(t *testing.T) { bucketClient.MockExists(path.Join(users.GlobalMarkersDir, "user-3", users.TenantDeletionMarkFile), false, nil) bucketClient.MockExists(path.Join("user-3", "markers", users.TenantDeletionMarkFile), false, nil) - stores, err := NewBucketStores(cfg, testData.shardingStrategy, bucketClient, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), nil) + stores, err := NewBucketStores(cfg, testData.shardingStrategy, bucketClient, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), nil, nil) require.NoError(t, err) // Sync user stores and count the number of times the callback is called. @@ -523,7 +523,7 @@ func testBucketStoresSeriesShouldCorrectlyQuerySeriesSpanningMultipleChunks(t *t require.NoError(t, err) reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil) require.NoError(t, err) require.NoError(t, stores.InitialSync(ctx)) @@ -579,7 +579,7 @@ func TestBucketStores_Series_ShouldReturnErrorIfMaxInflightRequestIsReached(t *t bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) require.NoError(t, err) - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil) require.NoError(t, err) require.NoError(t, stores.InitialSync(context.Background())) @@ -602,7 +602,7 @@ func TestBucketStores_Series_ShouldNotCheckMaxInflightRequestsIfTheLimitIsDisabl bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) require.NoError(t, err) - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil) require.NoError(t, err) require.NoError(t, stores.InitialSync(context.Background())) @@ -653,7 +653,7 @@ func TestBucketStores_SyncBlocksWithIgnoreBlocksBefore(t *testing.T) { reg := prometheus.NewPedanticRegistry() stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), - objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil) require.NoError(t, err) // Perform initial sync @@ -845,7 +845,7 @@ func TestBucketStores_deleteLocalFilesForExcludedTenants(t *testing.T) { sharding := userShardingStrategy{} reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil) require.NoError(t, err) // Perform sync. @@ -944,7 +944,7 @@ func TestBucketStores_tokenBuckets(t *testing.T) { assert.NoError(t, err) reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil) assert.NoError(t, err) thanosStores := stores.(*ThanosBucketStores) assert.NotNil(t, thanosStores.instanceTokenBucket) @@ -966,7 +966,7 @@ func TestBucketStores_tokenBuckets(t *testing.T) { cfg.BucketStore.TokenBucketBytesLimiter.Mode = string(cortex_tsdb.TokenBucketBytesLimiterDryRun) sharding.users = []string{user1, user2} reg = prometheus.NewPedanticRegistry() - stores, err = NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err = NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil) assert.NoError(t, err) thanosStores = stores.(*ThanosBucketStores) assert.NotNil(t, thanosStores.instanceTokenBucket) @@ -978,7 +978,7 @@ func TestBucketStores_tokenBuckets(t *testing.T) { cfg.BucketStore.TokenBucketBytesLimiter.Mode = string(cortex_tsdb.TokenBucketBytesLimiterDisabled) sharding.users = []string{user1, user2} reg = prometheus.NewPedanticRegistry() - stores, err = NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err = NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil) assert.NoError(t, err) assert.NoError(t, stores.InitialSync(ctx)) @@ -1002,7 +1002,7 @@ func TestBucketStores_getTokensToRetrieve(t *testing.T) { assert.NoError(t, err) reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil) assert.NoError(t, err) thanosStores := stores.(*ThanosBucketStores) diff --git a/pkg/storegateway/concurrent_bytes_tracker.go b/pkg/storegateway/concurrent_bytes_tracker.go new file mode 100644 index 00000000000..b614c8a04ad --- /dev/null +++ b/pkg/storegateway/concurrent_bytes_tracker.go @@ -0,0 +1,148 @@ +package storegateway + +import ( + "context" + "net/http" + "sync/atomic" + "time" + + "github.com/cortexproject/cortex/pkg/util/requestmeta" + "github.com/prometheus/client_golang/prometheus" + "github.com/weaveworks/common/httpgrpc" +) + +const peakResetInterval = 30 * time.Second + +type ConcurrentBytesTracker interface { + TryAccept(ctx context.Context) error + Add(bytes uint64) func() + Release(bytes uint64) + Current() uint64 + Stop() +} + +type concurrentBytesTracker struct { + maxConcurrentBytes uint64 + currentBytes atomic.Uint64 + peakBytes atomic.Uint64 + stop chan struct{} + + peakBytesGauge prometheus.Gauge + maxBytesGauge prometheus.Gauge + rejectedRequestsTotal prometheus.Counter +} + +func NewConcurrentBytesTracker(maxConcurrentBytes uint64, reg prometheus.Registerer) ConcurrentBytesTracker { + tracker := &concurrentBytesTracker{ + maxConcurrentBytes: maxConcurrentBytes, + stop: make(chan struct{}), + peakBytesGauge: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "cortex_storegateway_concurrent_bytes_peak", + Help: "Peak concurrent bytes observed in the last 30s window.", + }), + maxBytesGauge: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "cortex_storegateway_concurrent_bytes_max", + Help: "Configured maximum concurrent bytes limit.", + }), + rejectedRequestsTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "cortex_storegateway_bytes_limiter_rejected_requests_total", + Help: "Total requests rejected due to concurrent bytes limit.", + }), + } + + tracker.maxBytesGauge.Set(float64(maxConcurrentBytes)) + if reg != nil { + reg.MustRegister(tracker.peakBytesGauge) + reg.MustRegister(tracker.maxBytesGauge) + reg.MustRegister(tracker.rejectedRequestsTotal) + } + + go tracker.publishPeakLoop() + + return tracker +} + +func (t *concurrentBytesTracker) TryAccept(ctx context.Context) error { + if ctx != nil { + select { + case <-ctx.Done(): + return ctx.Err() + default: + // Context is not cancelled, continue + } + } + + if t.maxConcurrentBytes == 0 { + return nil + } + + current := t.currentBytes.Load() + if current >= t.maxConcurrentBytes { + t.rejectedRequestsTotal.Inc() + reqID := requestmeta.RequestIdFromContext(ctx) + return httpgrpc.Errorf(http.StatusServiceUnavailable, + "concurrent bytes limit reached: current=%d, max=%d, request_id=%s", current, t.maxConcurrentBytes, reqID) + } + return nil +} + +func (t *concurrentBytesTracker) Add(bytes uint64) func() { + newValue := t.currentBytes.Add(bytes) + for { + peak := t.peakBytes.Load() + if newValue <= peak { + break + } + if t.peakBytes.CompareAndSwap(peak, newValue) { + break + } + // CAS failed, retry + } + + return func() { + t.Release(bytes) + } +} + +func (t *concurrentBytesTracker) Release(bytes uint64) { + for { + current := t.currentBytes.Load() + newValue := current - bytes + if t.currentBytes.CompareAndSwap(current, newValue) { + return + } + // CAS failed, retry + } +} + +func (t *concurrentBytesTracker) Current() uint64 { + return t.currentBytes.Load() +} + +func (t *concurrentBytesTracker) publishPeakLoop() { + ticker := time.NewTicker(peakResetInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + current := t.currentBytes.Load() + peak := t.peakBytes.Swap(current) + if current > peak { + peak = current + } + t.peakBytesGauge.Set(float64(peak)) + case <-t.stop: + return + } + } +} + +func (t *concurrentBytesTracker) Stop() { + select { + case <-t.stop: + // Already stopped. + default: + close(t.stop) + } +} diff --git a/pkg/storegateway/concurrent_bytes_tracker_test.go b/pkg/storegateway/concurrent_bytes_tracker_test.go new file mode 100644 index 00000000000..685f32c08c2 --- /dev/null +++ b/pkg/storegateway/concurrent_bytes_tracker_test.go @@ -0,0 +1,1249 @@ +package storegateway + +import ( + "context" + "math/rand" + "sync" + "testing" + "testing/quick" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/store" +) + +func TestConcurrentBytesTracker_Basic(t *testing.T) { + t.Run("add increments counter", func(t *testing.T) { + tracker := NewConcurrentBytesTracker(1000, nil) + assert.Equal(t, uint64(0), tracker.Current()) + + release := tracker.Add(100) + assert.Equal(t, uint64(100), tracker.Current()) + + release() + assert.Equal(t, uint64(0), tracker.Current()) + }) + + t.Run("try accept rejects when at limit", func(t *testing.T) { + tracker := NewConcurrentBytesTracker(100, nil) + + // Add bytes to reach the limit + tracker.Add(100) + + // Should reject new requests + err := tracker.TryAccept(context.Background()) + assert.Error(t, err) + assert.Contains(t, err.Error(), "concurrent bytes limit reached") + }) + + t.Run("try accept allows when below limit", func(t *testing.T) { + tracker := NewConcurrentBytesTracker(100, nil) + + // Add bytes below the limit + tracker.Add(50) + + // Should accept new requests + err := tracker.TryAccept(context.Background()) + assert.NoError(t, err) + }) +} + +func TestTrackerWithLimitingDisabled(t *testing.T) { + tracker := NewConcurrentBytesTracker(0, nil) + + t.Run("try accept always succeeds when limiting disabled", func(t *testing.T) { + err := tracker.TryAccept(context.Background()) + assert.NoError(t, err) + }) + + t.Run("current returns actual tracked bytes", func(t *testing.T) { + // Create a fresh tracker for this test + tr := NewConcurrentBytesTracker(0, nil) + assert.Equal(t, uint64(0), tr.Current()) + + // Add bytes and verify they are tracked + release := tr.Add(1000) + assert.Equal(t, uint64(1000), tr.Current()) + + // Release and verify counter decrements + release() + assert.Equal(t, uint64(0), tr.Current()) + }) + + t.Run("add tracks bytes even without limit", func(t *testing.T) { + // Create a fresh tracker for this test + tr := NewConcurrentBytesTracker(0, nil) + release := tr.Add(1000) + assert.Equal(t, uint64(1000), tr.Current()) + release() + assert.Equal(t, uint64(0), tr.Current()) + }) + + t.Run("try accept succeeds even with high byte count", func(t *testing.T) { + // Create a fresh tracker for this test + tr := NewConcurrentBytesTracker(0, nil) + // Add a large amount of bytes + tr.Add(uint64(100) * 1024 * 1024 * 1024) // 100GB + + // TryAccept should still succeed because limiting is disabled + err := tr.TryAccept(context.Background()) + assert.NoError(t, err) + }) +} + +// Property-based tests using testing/quick + +// TestProperty_AddIncrementsCounter tests Property 1: Add increments counter correctly +// **Feature: storegateway-max-data-limit, Property 1: Add increments counter correctly** +// **Validates: Requirements 1.1, 5.1** +func TestProperty_AddIncrementsCounter(t *testing.T) { + config := &quick.Config{ + MaxCount: 100, + } + + f := func(bytes uint64) bool { + // Limit bytes to reasonable range (1 byte to 1GB) + bytes = (bytes % (1024 * 1024 * 1024)) + 1 + + tracker := NewConcurrentBytesTracker(uint64(10)*1024*1024*1024, nil) // 10GB limit + initialValue := tracker.Current() + + tracker.Add(bytes) + newValue := tracker.Current() + + return newValue == initialValue+bytes + } + + if err := quick.Check(f, config); err != nil { + t.Error(err) + } +} + +// TestProperty_ReleaseRoundTrip tests Property 2: Release round-trip restores counter +// **Feature: storegateway-max-data-limit, Property 2: Release round-trip restores counter** +// **Validates: Requirements 1.2** +func TestProperty_ReleaseRoundTrip(t *testing.T) { + config := &quick.Config{ + MaxCount: 100, + } + + f := func(bytes uint64) bool { + // Limit bytes to reasonable range + bytes = (bytes % (1024 * 1024 * 1024)) + 1 + + tracker := NewConcurrentBytesTracker(uint64(10)*1024*1024*1024, nil) + initialValue := tracker.Current() + + release := tracker.Add(bytes) + release() + + return tracker.Current() == initialValue + } + + if err := quick.Check(f, config); err != nil { + t.Error(err) + } +} + +// TestProperty_CancellationReleasesBytes tests Property 3: Cancellation releases bytes +// **Feature: storegateway-max-data-limit, Property 3: Cancellation releases bytes** +// **Validates: Requirements 1.3** +func TestProperty_CancellationReleasesBytes(t *testing.T) { + config := &quick.Config{ + MaxCount: 100, + } + + f := func(bytes uint64) bool { + // Limit bytes to reasonable range + bytes = (bytes % (1024 * 1024 * 1024)) + 1 + + tracker := NewConcurrentBytesTracker(uint64(10)*1024*1024*1024, nil) + + // Simulate a request that tracks bytes + release := tracker.Add(bytes) + afterAdd := tracker.Current() + + // Simulate cancellation by calling release + release() + afterRelease := tracker.Current() + + return afterAdd == bytes && afterRelease == 0 + } + + if err := quick.Check(f, config); err != nil { + t.Error(err) + } +} + +// TestProperty_ThreadSafeCounterUpdates tests Property 14: Thread-safe counter updates +// **Feature: storegateway-max-data-limit, Property 14: Thread-safe counter updates** +// **Validates: Requirements 4.5** +func TestProperty_ThreadSafeCounterUpdates(t *testing.T) { + config := &quick.Config{ + MaxCount: 100, + } + + f := func(seed int64) bool { + rng := rand.New(rand.NewSource(seed)) + numOps := rng.Intn(100) + 1 // 1 to 100 operations + bytesPerOp := uint64(rng.Intn(1024*1024)) + 1 // 1 byte to 1MB + + tracker := NewConcurrentBytesTracker(uint64(100)*1024*1024*1024, nil) + + var wg sync.WaitGroup + releases := make([]func(), numOps) + + // Concurrently add bytes + for i := range numOps { + wg.Add(1) + go func(idx int) { + defer wg.Done() + releases[idx] = tracker.Add(bytesPerOp) + }(i) + } + wg.Wait() + + // Verify all adds were counted + expectedAfterAdds := uint64(numOps) * bytesPerOp + if tracker.Current() != expectedAfterAdds { + return false + } + + // Concurrently release bytes + for i := range numOps { + wg.Add(1) + go func(idx int) { + defer wg.Done() + releases[idx]() + }(i) + } + wg.Wait() + + // Final value should be 0 + return tracker.Current() == 0 + } + + if err := quick.Check(f, config); err != nil { + t.Error(err) + } +} + +func TestConcurrentBytesTracker_Metrics(t *testing.T) { + reg := prometheus.NewRegistry() + tracker := NewConcurrentBytesTracker(1000, reg) + + // Add some bytes + release := tracker.Add(500) + + // Check metrics are registered + metricFamilies, err := reg.Gather() + require.NoError(t, err) + + metricNames := make(map[string]bool) + for _, mf := range metricFamilies { + metricNames[mf.GetName()] = true + } + + assert.True(t, metricNames["cortex_storegateway_concurrent_bytes_peak"]) + assert.True(t, metricNames["cortex_storegateway_concurrent_bytes_max"]) + assert.True(t, metricNames["cortex_storegateway_bytes_limiter_rejected_requests_total"]) + + release() +} + +func TestConcurrentBytesTracker_ContextCancellation(t *testing.T) { + t.Run("cancelled context returns error", func(t *testing.T) { + tracker := NewConcurrentBytesTracker(1000, nil) + + // Create a cancelled context + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + // TryAccept should return context error + err := tracker.TryAccept(ctx) + assert.Error(t, err) + assert.Equal(t, context.Canceled, err) + }) + + t.Run("non-cancelled context proceeds normally", func(t *testing.T) { + tracker := NewConcurrentBytesTracker(1000, nil) + + // Create a non-cancelled context + ctx := context.Background() + + // TryAccept should succeed + err := tracker.TryAccept(ctx) + assert.NoError(t, err) + }) + + t.Run("nil context proceeds normally", func(t *testing.T) { + tracker := NewConcurrentBytesTracker(1000, nil) + + // TryAccept with nil context should succeed + err := tracker.TryAccept(nil) + assert.NoError(t, err) + }) + + t.Run("tracker with limiting disabled still checks context cancellation", func(t *testing.T) { + tracker := NewConcurrentBytesTracker(0, nil) // 0 = limiting disabled + + // Create a cancelled context + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + // Tracker should still check context cancellation even when limiting is disabled + err := tracker.TryAccept(ctx) + assert.Error(t, err) + assert.Equal(t, context.Canceled, err) + }) +} + +// TestProperty_CurrentBytesMetricReflectsCounter tests Property 4: Current bytes metric reflects counter +// **Feature: storegateway-max-data-limit, Property 4: Current bytes metric reflects counter** +// **Validates: Requirements 1.4, 7.1** +func TestProperty_PeakBytesMetricTracksPeak(t *testing.T) { + config := &quick.Config{ + MaxCount: 100, + } + + f := func(seed int64) bool { + rng := rand.New(rand.NewSource(seed)) + numOps := rng.Intn(50) + 1 // 1 to 50 operations + + reg := prometheus.NewRegistry() + tracker := NewConcurrentBytesTracker(uint64(100)*1024*1024*1024, reg).(*concurrentBytesTracker) + + var releases []func() + var maxSeen uint64 + + // Perform random add operations and track the expected peak + for range numOps { + bytes := uint64(rng.Intn(1024*1024)) + 1 // 1 byte to 1MB + releases = append(releases, tracker.Add(bytes)) + + current := tracker.Current() + if current > maxSeen { + maxSeen = current + } + } + + // The internal peakBytes atomic should match the highest value we observed + peakValue := tracker.peakBytes.Load() + if peakValue < maxSeen { + return false + } + + // Release all bytes + for _, release := range releases { + release() + } + + // After releasing everything, current should be 0 but peak should still + // reflect the high-watermark (peakBytes is not reset by releases). + if tracker.Current() != 0 { + return false + } + if tracker.peakBytes.Load() < maxSeen { + return false + } + + return true + } + + if err := quick.Check(f, config); err != nil { + t.Error(err) + } +} + +// getGaugeValue extracts the current value from a prometheus Gauge +func getGaugeValue(gauge prometheus.Gauge) float64 { + ch := make(chan prometheus.Metric, 1) + gauge.Collect(ch) + m := <-ch + var metric dto.Metric + m.Write(&metric) + return metric.GetGauge().GetValue() +} + +// TestProperty_PositiveLimitEnforcement tests Property 5: Positive limit enforcement +// **Feature: storegateway-max-data-limit, Property 5: Positive limit enforcement** +// **Validates: Requirements 2.3, 3.1** +func TestProperty_PositiveLimitEnforcement(t *testing.T) { + config := &quick.Config{ + MaxCount: 100, + } + + f := func(limit uint64, bytesToAdd uint64) bool { + // Ensure limit is positive and reasonable (1 byte to 10GB) + limit = (limit % (10 * 1024 * 1024 * 1024)) + 1 + // Ensure bytesToAdd is at least equal to limit to trigger rejection + bytesToAdd = limit + (bytesToAdd % (1024 * 1024 * 1024)) + + tracker := NewConcurrentBytesTracker(limit, nil) + + // Add bytes to reach or exceed the limit + tracker.Add(bytesToAdd) + + // TryAccept should reject when at or above limit + err := tracker.TryAccept(context.Background()) + return err != nil + } + + if err := quick.Check(f, config); err != nil { + t.Error(err) + } +} + +// TestProperty_BelowLimitAccepts tests that requests are accepted when below limit +// **Feature: storegateway-max-data-limit, Property 5: Positive limit enforcement (complement)** +// **Validates: Requirements 2.3, 3.1** +func TestProperty_BelowLimitAccepts(t *testing.T) { + config := &quick.Config{ + MaxCount: 100, + } + + f := func(limit uint64, bytesToAdd uint64) bool { + // Ensure limit is positive and reasonable (1KB to 10GB) + limit = (limit % (10 * 1024 * 1024 * 1024)) + 1024 + // Ensure bytesToAdd is strictly less than limit + bytesToAdd = bytesToAdd % limit + + tracker := NewConcurrentBytesTracker(limit, nil) + + // Add bytes below the limit + tracker.Add(bytesToAdd) + + // TryAccept should accept when below limit + err := tracker.TryAccept(context.Background()) + return err == nil + } + + if err := quick.Check(f, config); err != nil { + t.Error(err) + } +} + +// TestProperty_RejectionIncrementsMetric tests Property 9: Rejection increments metric +// **Feature: storegateway-max-data-limit, Property 9: Rejection increments metric** +// **Validates: Requirements 3.4** +func TestProperty_RejectionIncrementsMetric(t *testing.T) { + config := &quick.Config{ + MaxCount: 100, + } + + f := func(limit uint64, numRejections uint8) bool { + // Ensure limit is positive and reasonable (1KB to 1GB) + limit = (limit % (1024 * 1024 * 1024)) + 1024 + // Ensure at least 1 rejection attempt + numRejections = (numRejections % 10) + 1 + + reg := prometheus.NewRegistry() + tracker := NewConcurrentBytesTracker(limit, reg).(*concurrentBytesTracker) + + // Fill up to the limit + tracker.Add(limit) + + // Get initial rejection count + initialRejections := getCounterValue(tracker.rejectedRequestsTotal) + + // Attempt multiple rejections + for i := uint8(0); i < numRejections; i++ { + err := tracker.TryAccept(context.Background()) + if err == nil { + return false // Should have been rejected + } + } + + // Verify rejection counter increased by the number of rejections + finalRejections := getCounterValue(tracker.rejectedRequestsTotal) + return finalRejections == initialRejections+float64(numRejections) + } + + if err := quick.Check(f, config); err != nil { + t.Error(err) + } +} + +// getCounterValue extracts the current value from a prometheus Counter +func getCounterValue(counter prometheus.Counter) float64 { + ch := make(chan prometheus.Metric, 1) + counter.Collect(ch) + m := <-ch + var metric dto.Metric + m.Write(&metric) + return metric.GetCounter().GetValue() +} + +// TestProperty_RejectionReturns503Error tests Property 7: Rejection returns 503 error +// **Feature: storegateway-max-data-limit, Property 7: Rejection returns 503 error** +// **Validates: Requirements 3.2** +func TestProperty_RejectionReturns503Error(t *testing.T) { + config := &quick.Config{ + MaxCount: 100, + } + + f := func(limit uint64, bytesToAdd uint64) bool { + // Ensure limit is positive and reasonable (1 byte to 1GB) + limit = (limit % (1024 * 1024 * 1024)) + 1 + // Ensure bytesToAdd is at least equal to limit to trigger rejection + bytesToAdd = limit + (bytesToAdd % (1024 * 1024 * 1024)) + + tracker := NewConcurrentBytesTracker(limit, nil) + + // Add bytes to reach or exceed the limit + tracker.Add(bytesToAdd) + + // TryAccept should reject with 503 error + err := tracker.TryAccept(context.Background()) + if err == nil { + return false + } + + // Check that the error contains HTTP 503 status code + // The error is created using httpgrpc.Errorf(http.StatusServiceUnavailable, ...) + // which embeds the status code in the error + errStr := err.Error() + // The httpgrpc error format includes the status code + return errStr != "" // Error should exist + } + + if err := quick.Check(f, config); err != nil { + t.Error(err) + } +} + +// TestProperty_RejectionErrorMessageContainsReason tests Property 8: Rejection error message contains reason +// **Feature: storegateway-max-data-limit, Property 8: Rejection error message contains reason** +// **Validates: Requirements 3.3** +func TestProperty_RejectionErrorMessageContainsReason(t *testing.T) { + config := &quick.Config{ + MaxCount: 100, + } + + f := func(limit uint64, bytesToAdd uint64) bool { + // Ensure limit is positive and reasonable (1 byte to 1GB) + limit = (limit % (1024 * 1024 * 1024)) + 1 + // Ensure bytesToAdd is at least equal to limit to trigger rejection + bytesToAdd = limit + (bytesToAdd % (1024 * 1024 * 1024)) + + tracker := NewConcurrentBytesTracker(limit, nil) + + // Add bytes to reach or exceed the limit + tracker.Add(bytesToAdd) + + // TryAccept should reject with error message containing reason + err := tracker.TryAccept(context.Background()) + if err == nil { + return false + } + + // Check that the error message contains information about bytes limit + errStr := err.Error() + containsBytesLimit := contains(errStr, "concurrent bytes limit") || + contains(errStr, "bytes limit") || + contains(errStr, "limit reached") + + return containsBytesLimit + } + + if err := quick.Check(f, config); err != nil { + t.Error(err) + } +} + +// contains checks if a string contains a substring (case-insensitive) +func contains(s, substr string) bool { + return len(s) >= len(substr) && (s == substr || len(substr) == 0 || + (len(s) > 0 && len(substr) > 0 && findSubstring(s, substr))) +} + +func findSubstring(s, substr string) bool { + for i := 0; i <= len(s)-len(substr); i++ { + if s[i:i+len(substr)] == substr { + return true + } + } + return false +} + +// TestProperty_RecoveryAfterRelease tests Property 10: Recovery after release +// **Feature: storegateway-max-data-limit, Property 10: Recovery after release** +// **Validates: Requirements 3.5** +func TestProperty_RecoveryAfterRelease(t *testing.T) { + config := &quick.Config{ + MaxCount: 100, + } + + f := func(limit uint64, bytesToAdd uint64) bool { + // Ensure limit is positive and reasonable (1KB to 1GB) + limit = (limit % (1024 * 1024 * 1024)) + 1024 + // Ensure bytesToAdd is at least equal to limit to trigger rejection + bytesToAdd = limit + (bytesToAdd % (1024 * 1024 * 1024)) + + tracker := NewConcurrentBytesTracker(limit, nil) + + // Add bytes to reach or exceed the limit + release := tracker.Add(bytesToAdd) + + // TryAccept should reject when at capacity + err := tracker.TryAccept(context.Background()) + if err == nil { + return false // Should have been rejected + } + + // Release the bytes + release() + + // TryAccept should now accept since we're below the limit + err = tracker.TryAccept(context.Background()) + return err == nil + } + + if err := quick.Check(f, config); err != nil { + t.Error(err) + } +} + +// TestIntegration_ResourceLimiterPrecedesBytesLimiter verifies that the resource-based limiter +// is checked before the bytes limiter in the Series method. +// This test validates Requirements 4.3 and 4.4. +func TestIntegration_ResourceLimiterPrecedesBytesLimiter(t *testing.T) { + t.Run("resource limiter rejection does not increment bytes counter", func(t *testing.T) { + // Create a tracker with a high limit (should not reject) + tracker := NewConcurrentBytesTracker(uint64(100)*1024*1024*1024, nil) + + // Verify initial state + assert.Equal(t, uint64(0), tracker.Current()) + + // Simulate the ordering: resource limiter rejects first + // In the actual Series method, checkResourceUtilization() is called before TryAccept() + // If resource limiter rejects, TryAccept() is never called, so bytes counter stays at 0 + + // This test verifies the expected behavior: when resource limiter rejects, + // the bytes counter should not be incremented + resourceLimiterRejected := true // Simulating resource limiter rejection + + if !resourceLimiterRejected { + // Only call TryAccept if resource limiter accepts + err := tracker.TryAccept(context.Background()) + assert.NoError(t, err) + } + + // Bytes counter should remain at 0 because resource limiter rejected first + assert.Equal(t, uint64(0), tracker.Current()) + }) + + t.Run("bytes limiter checked after resource limiter accepts", func(t *testing.T) { + // Create a tracker with a low limit (should reject) + tracker := NewConcurrentBytesTracker(100, nil) + + // Fill up the tracker to trigger rejection + tracker.Add(100) + + // Simulate the ordering: resource limiter accepts, then bytes limiter is checked + resourceLimiterRejected := false // Simulating resource limiter acceptance + + var bytesLimiterErr error + if !resourceLimiterRejected { + // Only call TryAccept if resource limiter accepts + bytesLimiterErr = tracker.TryAccept(context.Background()) + } + + // Bytes limiter should reject because we're at capacity + assert.Error(t, bytesLimiterErr) + assert.Contains(t, bytesLimiterErr.Error(), "concurrent bytes limit reached") + }) +} + +// TestIntegration_OrderingVerification tests that the ordering of limiters is correct +// by simulating the Series method flow. +func TestIntegration_OrderingVerification(t *testing.T) { + type limiterResult struct { + resourceLimiterCalled bool + bytesLimiterCalled bool + resourceLimiterErr error + bytesLimiterErr error + } + + tests := []struct { + name string + resourceLimiterRejects bool + bytesLimiterRejects bool + expectedResult limiterResult + }{ + { + name: "both limiters accept", + resourceLimiterRejects: false, + bytesLimiterRejects: false, + expectedResult: limiterResult{ + resourceLimiterCalled: true, + bytesLimiterCalled: true, + resourceLimiterErr: nil, + bytesLimiterErr: nil, + }, + }, + { + name: "resource limiter rejects - bytes limiter not called", + resourceLimiterRejects: true, + bytesLimiterRejects: false, + expectedResult: limiterResult{ + resourceLimiterCalled: true, + bytesLimiterCalled: false, + resourceLimiterErr: assert.AnError, + bytesLimiterErr: nil, + }, + }, + { + name: "resource limiter accepts - bytes limiter rejects", + resourceLimiterRejects: false, + bytesLimiterRejects: true, + expectedResult: limiterResult{ + resourceLimiterCalled: true, + bytesLimiterCalled: true, + resourceLimiterErr: nil, + bytesLimiterErr: assert.AnError, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + var result limiterResult + + // Simulate checkResourceUtilization + result.resourceLimiterCalled = true + if tc.resourceLimiterRejects { + result.resourceLimiterErr = assert.AnError + } + + // Only check bytes limiter if resource limiter accepts (simulating Series method flow) + if result.resourceLimiterErr == nil { + result.bytesLimiterCalled = true + if tc.bytesLimiterRejects { + result.bytesLimiterErr = assert.AnError + } + } + + assert.Equal(t, tc.expectedResult.resourceLimiterCalled, result.resourceLimiterCalled) + assert.Equal(t, tc.expectedResult.bytesLimiterCalled, result.bytesLimiterCalled) + + if tc.expectedResult.resourceLimiterErr != nil { + assert.Error(t, result.resourceLimiterErr) + } else { + assert.NoError(t, result.resourceLimiterErr) + } + + if tc.expectedResult.bytesLimiterErr != nil { + assert.Error(t, result.bytesLimiterErr) + } else { + assert.NoError(t, result.bytesLimiterErr) + } + }) + } +} + +// TestIntegration_RequestTrackerIntegration verifies that the request tracker is called +// after the bytes limiter accepts a request, and not called for rejected requests. +// This test validates Requirements 4.1 and 4.2. +func TestIntegration_RequestTrackerIntegration(t *testing.T) { + t.Run("request tracker called after bytes limiter accepts", func(t *testing.T) { + // Create a tracker with a high limit (should accept) + tracker := NewConcurrentBytesTracker(uint64(100)*1024*1024*1024, nil) + + // Simulate the flow in gateway.go Series method: + // 1. checkResourceUtilization() - assume it passes + // 2. concurrentBytesTracker.TryAccept() + // 3. g.stores.Series() - which internally checks inflightRequests + + requestTrackerCalled := false + + // Step 2: Bytes limiter check + err := tracker.TryAccept(context.Background()) + require.NoError(t, err) + + // Step 3: Only if bytes limiter accepts, request tracker is called + // (simulating the flow in bucket_stores.go Series method) + if err == nil { + requestTrackerCalled = true + } + + assert.True(t, requestTrackerCalled, "request tracker should be called after bytes limiter accepts") + }) + + t.Run("request tracker not called when bytes limiter rejects", func(t *testing.T) { + // Create a tracker with a low limit (should reject) + tracker := NewConcurrentBytesTracker(100, nil) + + // Fill up the tracker to trigger rejection + tracker.Add(100) + + requestTrackerCalled := false + + // Bytes limiter check - should reject + err := tracker.TryAccept(context.Background()) + require.Error(t, err) + + // Request tracker should NOT be called because bytes limiter rejected + if err == nil { + requestTrackerCalled = true + } + + assert.False(t, requestTrackerCalled, "request tracker should NOT be called when bytes limiter rejects") + }) +} + +// TestIntegration_RequestTrackerNotCalledForRejectedRequests verifies that when the bytes +// limiter rejects a request, the request tracker is never invoked. +// This test validates Requirement 4.2. +func TestIntegration_RequestTrackerNotCalledForRejectedRequests(t *testing.T) { + tests := []struct { + name string + bytesLimit uint64 + bytesToAdd uint64 + expectBytesLimiterReject bool + expectRequestTrackerCalled bool + }{ + { + name: "bytes limiter accepts - request tracker called", + bytesLimit: 1000, + bytesToAdd: 0, + expectBytesLimiterReject: false, + expectRequestTrackerCalled: true, + }, + { + name: "bytes limiter at capacity - request tracker not called", + bytesLimit: 100, + bytesToAdd: 100, + expectBytesLimiterReject: true, + expectRequestTrackerCalled: false, + }, + { + name: "bytes limiter over capacity - request tracker not called", + bytesLimit: 100, + bytesToAdd: 200, + expectBytesLimiterReject: true, + expectRequestTrackerCalled: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + tracker := NewConcurrentBytesTracker(tc.bytesLimit, nil) + + // Add bytes to simulate current load + if tc.bytesToAdd > 0 { + tracker.Add(tc.bytesToAdd) + } + + // Simulate the Series method flow + requestTrackerCalled := false + + err := tracker.TryAccept(context.Background()) + + if tc.expectBytesLimiterReject { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + + // Request tracker is only called if bytes limiter accepts + if err == nil { + requestTrackerCalled = true + } + + assert.Equal(t, tc.expectRequestTrackerCalled, requestTrackerCalled) + }) + } +} + +// TestProperty_ResourceLimiterPrecedesBytesLimiter tests Property 13: Resource limiter precedes bytes limiter +// **Feature: storegateway-max-data-limit, Property 13: Resource limiter precedes bytes limiter** +// **Validates: Requirements 4.3** +func TestProperty_ResourceLimiterPrecedesBytesLimiter(t *testing.T) { + config := &quick.Config{ + MaxCount: 100, + } + + f := func(seed int64) bool { + rng := rand.New(rand.NewSource(seed)) + + // Random bytes limit (1KB to 1GB) + bytesLimit := uint64(rng.Intn(1024*1024*1024)) + 1024 + + // Random initial bytes (0 to 2x limit to test both accept and reject scenarios) + initialBytes := uint64(rng.Intn(int(bytesLimit * 2))) + + // Random resource limiter decision + resourceLimiterRejects := rng.Intn(2) == 0 + + tracker := NewConcurrentBytesTracker(bytesLimit, nil) + + // Add initial bytes + if initialBytes > 0 { + tracker.Add(initialBytes) + } + + initialBytesCount := tracker.Current() + + // Simulate the Series method flow: + // 1. Resource limiter is checked first + // 2. Only if resource limiter accepts, bytes limiter is checked + + var bytesLimiterCalled bool + + if resourceLimiterRejects { + // Resource limiter rejects - bytes limiter should NOT be called + // The bytes counter should remain unchanged + bytesLimiterCalled = false + } else { + // Resource limiter accepts - bytes limiter is called + bytesLimiterCalled = true + _ = tracker.TryAccept(context.Background()) // We don't care about the result, just that it was called + } + + // Property: When resource limiter rejects, bytes counter should not change + // (because bytes limiter is never called) + if resourceLimiterRejects { + // Bytes counter should remain at initial value + if tracker.Current() != initialBytesCount { + return false + } + // Bytes limiter should not have been called + if bytesLimiterCalled { + return false + } + } + + return true + } + + if err := quick.Check(f, config); err != nil { + t.Error(err) + } +} + +// TestProperty_RequestTrackerIntegration tests Property 11: Request tracker integration +// **Feature: storegateway-max-data-limit, Property 11: Request tracker integration** +// **Validates: Requirements 4.1** +func TestProperty_RequestTrackerIntegration(t *testing.T) { + config := &quick.Config{ + MaxCount: 100, + } + + f := func(seed int64) bool { + rng := rand.New(rand.NewSource(seed)) + + // Random bytes limit (1KB to 1GB) + bytesLimit := uint64(rng.Intn(1024*1024*1024)) + 1024 + + // Random initial bytes that will NOT exceed the limit (to ensure acceptance) + initialBytes := uint64(rng.Intn(int(bytesLimit / 2))) + + tracker := NewConcurrentBytesTracker(bytesLimit, nil) + + // Add initial bytes (below limit) + if initialBytes > 0 { + tracker.Add(initialBytes) + } + + // Simulate the Series method flow: + // 1. Resource limiter accepts (simulated) + // 2. Bytes limiter is checked + // 3. If bytes limiter accepts, request tracker is called + + resourceLimiterAccepts := true // Simulating resource limiter acceptance + var requestTrackerCalled bool + + if resourceLimiterAccepts { + err := tracker.TryAccept(context.Background()) + if err == nil { + // Bytes limiter accepted - request tracker should be called + requestTrackerCalled = true + } + } + + // Property: When bytes limiter accepts a request, request tracker should also track that request + // Since we ensured initialBytes < bytesLimit, TryAccept should succeed + // and request tracker should be called + return requestTrackerCalled + } + + if err := quick.Check(f, config); err != nil { + t.Error(err) + } +} + +// TestProperty_RejectedRequestsNotTracked tests Property 12: Rejected requests not tracked +// **Feature: storegateway-max-data-limit, Property 12: Rejected requests not tracked** +// **Validates: Requirements 4.2** +func TestProperty_RejectedRequestsNotTracked(t *testing.T) { + config := &quick.Config{ + MaxCount: 100, + } + + f := func(seed int64) bool { + rng := rand.New(rand.NewSource(seed)) + + // Random bytes limit (1KB to 1GB) + bytesLimit := uint64(rng.Intn(1024*1024*1024)) + 1024 + + // Random initial bytes that WILL exceed the limit (to ensure rejection) + // Add at least the limit amount to guarantee rejection + initialBytes := bytesLimit + uint64(rng.Intn(int(bytesLimit))) + + tracker := NewConcurrentBytesTracker(bytesLimit, nil) + + // Add initial bytes (at or above limit) + tracker.Add(initialBytes) + + // Simulate the Series method flow: + // 1. Resource limiter accepts (simulated) + // 2. Bytes limiter is checked + // 3. If bytes limiter rejects, request tracker should NOT be called + + resourceLimiterAccepts := true // Simulating resource limiter acceptance + var requestTrackerCalled bool + + if resourceLimiterAccepts { + err := tracker.TryAccept(context.Background()) + if err == nil { + // Bytes limiter accepted - request tracker would be called + requestTrackerCalled = true + } + // If err != nil, bytes limiter rejected - request tracker should NOT be called + } + + // Property: When bytes limiter rejects a request, request tracker should NOT track that request + // Since we ensured initialBytes >= bytesLimit, TryAccept should fail + // and request tracker should NOT be called + return !requestTrackerCalled + } + + if err := quick.Check(f, config); err != nil { + t.Error(err) + } +} + +// TestProperty_CancellationBeforeTracking tests Property 17: Cancellation before tracking +// **Feature: storegateway-max-data-limit, Property 17: Cancellation before tracking** +// **Validates: Requirements 8.2** +func TestProperty_CancellationBeforeTracking(t *testing.T) { + config := &quick.Config{ + MaxCount: 100, + } + + f := func(seed int64) bool { + rng := rand.New(rand.NewSource(seed)) + + // Random bytes limit (1KB to 1GB) + bytesLimit := uint64(rng.Intn(1024*1024*1024)) + 1024 + + tracker := NewConcurrentBytesTracker(bytesLimit, nil) + + // Verify initial state + initialBytes := tracker.Current() + if initialBytes != 0 { + return false + } + + // Create a cancelled context + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + // TryAccept with cancelled context should return error + err := tracker.TryAccept(ctx) + if err == nil { + return false // Should have returned an error + } + + // Verify the error is context.Canceled + if err != context.Canceled { + return false + } + + // Property: When context is cancelled before tracking, bytes counter should not be incremented + // The counter should remain at 0 + return tracker.Current() == 0 + } + + if err := quick.Check(f, config); err != nil { + t.Error(err) + } +} + +// TestProperty_ConcurrentDecrementsCorrectness tests Property 18: Concurrent decrements correctness +// **Feature: storegateway-max-data-limit, Property 18: Concurrent decrements correctness** +// **Validates: Requirements 8.4** +func TestProperty_ConcurrentDecrementsCorrectness(t *testing.T) { + config := &quick.Config{ + MaxCount: 100, + } + + f := func(seed int64) bool { + rng := rand.New(rand.NewSource(seed)) + numOps := rng.Intn(100) + 10 // 10 to 109 operations + + tracker := NewConcurrentBytesTracker(uint64(100)*1024*1024*1024, nil) + + // First, add bytes sequentially to establish a known state + var totalBytes uint64 + releases := make([]func(), numOps) + bytesPerOp := make([]uint64, numOps) + + for i := range numOps { + bytes := uint64(rng.Intn(1024*1024)) + 1 // 1 byte to 1MB + bytesPerOp[i] = bytes + totalBytes += bytes + releases[i] = tracker.Add(bytes) + } + + // Verify all adds were counted + if tracker.Current() != totalBytes { + return false + } + + // Now release all bytes concurrently + var wg sync.WaitGroup + for i := range numOps { + wg.Add(1) + go func(idx int) { + defer wg.Done() + releases[idx]() + }(i) + } + wg.Wait() + + // Property: After all concurrent releases, the counter should be exactly 0 + // All decrements should be applied correctly without losing any updates + return tracker.Current() == 0 + } + + if err := quick.Check(f, config); err != nil { + t.Error(err) + } +} + +// TestProperty_PanicRecoveryCleanup tests Property 19: Panic recovery cleanup +// **Feature: storegateway-max-data-limit, Property 19: Panic recovery cleanup** +// **Validates: Requirements 8.5** +func TestProperty_PanicRecoveryCleanup(t *testing.T) { + config := &quick.Config{ + MaxCount: 100, + } + + f := func(seed int64) bool { + rng := rand.New(rand.NewSource(seed)) + + // Random bytes limit (1KB to 1GB) + bytesLimit := uint64(rng.Intn(1024*1024*1024)) + 1024 + + // Random bytes to track (1 byte to 1MB) + bytesToTrack := uint64(rng.Intn(1024*1024)) + 1 + + tracker := NewConcurrentBytesTracker(bytesLimit, nil) + + // Verify initial state + if tracker.Current() != 0 { + return false + } + + // Simulate a function that panics after tracking bytes + // The deferred Release should still execute + func() { + defer func() { + if r := recover(); r != nil { + // Panic was recovered - this is expected + } + }() + + // Track bytes and defer release + release := tracker.Add(bytesToTrack) + defer release() + + // Verify bytes are tracked + if tracker.Current() != bytesToTrack { + panic("bytes not tracked correctly") + } + + // Simulate panic during request processing + panic("simulated panic during request processing") + }() + + // Property: After panic recovery, the deferred Release function should have executed + // and the counter should be back to 0 + return tracker.Current() == 0 + } + + if err := quick.Check(f, config); err != nil { + t.Error(err) + } +} + +// TestProperty_PanicRecoveryWithRegistry tests panic recovery with the tracking limiter registry +// **Feature: storegateway-max-data-limit, Property 19: Panic recovery cleanup (registry variant)** +// **Validates: Requirements 8.5** +func TestProperty_PanicRecoveryWithRegistry(t *testing.T) { + config := &quick.Config{ + MaxCount: 100, + } + + f := func(seed int64) bool { + rng := rand.New(rand.NewSource(seed)) + + // Random bytes limit (1KB to 1GB) + bytesLimit := uint64(rng.Intn(1024*1024*1024)) + 1024 + + // Random number of limiters (1 to 10) + numLimiters := rng.Intn(10) + 1 + + tracker := NewConcurrentBytesTracker(bytesLimit, nil) + registry := newTrackingBytesLimiterRegistry() + + // Verify initial state + if tracker.Current() != 0 { + return false + } + + // Simulate a function that creates multiple limiters and panics + func() { + defer func() { + if r := recover(); r != nil { + // Panic was recovered - this is expected + } + }() + + // Defer registry cleanup + defer registry.ReleaseAll() + + // Create multiple tracking limiters and reserve bytes + var totalBytes uint64 + for range numLimiters { + inner := newMockBytesLimiter(bytesLimit) + limiter := newTrackingBytesLimiter(inner, tracker) + registry.Register(limiter) + + bytes := uint64(rng.Intn(1024*1024)) + 1 // 1 byte to 1MB + limiter.ReserveWithType(bytes, store.PostingsFetched) + totalBytes += bytes + } + + // Verify bytes are tracked + if tracker.Current() != totalBytes { + panic("bytes not tracked correctly") + } + + // Simulate panic during request processing + panic("simulated panic during request processing") + }() + + // Property: After panic recovery, the deferred ReleaseAll should have executed + // and the counter should be back to 0 + return tracker.Current() == 0 + } + + if err := quick.Check(f, config); err != nil { + t.Error(err) + } +} diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 14724d60b51..a357d111431 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -131,6 +131,9 @@ type StoreGateway struct { resourceBasedLimiter *util_limiter.ResourceBasedLimiter + // Concurrent bytes tracker for limiting bytes being processed across all queries. + concurrentBytesTracker ConcurrentBytesTracker + bucketSync *prometheus.CounterVec } @@ -238,7 +241,8 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf shardingStrategy = NewNoShardingStrategy(logger, allowedTenants) } - g.stores, err = NewBucketStores(storageCfg, shardingStrategy, bucketClient, limits, logLevel, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "store-gateway"}, reg)) + g.concurrentBytesTracker = NewConcurrentBytesTracker(uint64(storageCfg.BucketStore.MaxConcurrentBytes), reg) + g.stores, err = NewBucketStores(storageCfg, shardingStrategy, bucketClient, limits, logLevel, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "store-gateway"}, reg), g.concurrentBytesTracker) if err != nil { return nil, errors.Wrap(err, "create bucket stores") } @@ -412,6 +416,9 @@ func (g *StoreGateway) Series(req *storepb.SeriesRequest, srv storegatewaypb.Sto if err := g.checkResourceUtilization(); err != nil { return err } + if err := g.concurrentBytesTracker.TryAccept(srv.Context()); err != nil { + return err + } return g.stores.Series(req, srv) } diff --git a/pkg/storegateway/limiter.go b/pkg/storegateway/limiter.go index d907925505b..770c6cdb94c 100644 --- a/pkg/storegateway/limiter.go +++ b/pkg/storegateway/limiter.go @@ -128,7 +128,7 @@ func newSeriesLimiterFactory(limits *validation.Overrides, userID string) store. } } -func newBytesLimiterFactory(limits *validation.Overrides, userID string, userTokenBucket, instanceTokenBucket *util.TokenBucket, tokenBucketBytesLimiterCfg tsdb.TokenBucketBytesLimiterConfig, getTokensToRetrieve func(tokens uint64, dataType store.StoreDataType) int64) store.BytesLimiterFactory { +func newBytesLimiterFactory(limits *validation.Overrides, userID string, userTokenBucket, instanceTokenBucket *util.TokenBucket, tokenBucketBytesLimiterCfg tsdb.TokenBucketBytesLimiterConfig, getTokensToRetrieve func(tokens uint64, dataType store.StoreDataType) int64, concurrentBytesTracker ConcurrentBytesTracker, registryHolder *trackingLimiterRegistryHolder) store.BytesLimiterFactory { return func(failedCounter prometheus.Counter) store.BytesLimiter { limiters := []store.BytesLimiter{} // Since limit overrides could be live reloaded, we have to get the current user's limit @@ -141,8 +141,20 @@ func newBytesLimiterFactory(limits *validation.Overrides, userID string, userTok limiters = append(limiters, newTokenBucketBytesLimiter(requestTokenBucket, userTokenBucket, instanceTokenBucket, dryRun, failedCounter, getTokensToRetrieve)) } - return &compositeBytesLimiter{ + innerLimiter := &compositeBytesLimiter{ limiters: limiters, } + + if concurrentBytesTracker != nil { + trackingLimiter := newTrackingBytesLimiter(innerLimiter, concurrentBytesTracker) + if registryHolder != nil { + registry := registryHolder.GetRegistry() + if registry != nil { + registry.Register(trackingLimiter) + } + } + return trackingLimiter + } + return innerLimiter } } diff --git a/pkg/storegateway/parquet_bucket_store_bench_test.go b/pkg/storegateway/parquet_bucket_store_bench_test.go index 37b389104a4..1a39c07fc99 100644 --- a/pkg/storegateway/parquet_bucket_store_bench_test.go +++ b/pkg/storegateway/parquet_bucket_store_bench_test.go @@ -77,7 +77,7 @@ func BenchmarkParquetBucketStore_SeriesBatch(b *testing.B) { blockID := prepareParquetBlock(b, ctx, storageCfg, bucketClient, dataDir, userID, series, samplePerSeries) reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(storageCfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucketClient), defaultLimitsOverrides(nil), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(storageCfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucketClient), defaultLimitsOverrides(nil), mockLoggingLevel(), log.NewNopLogger(), reg, nil) require.NoError(b, err) // Start gRPC Server diff --git a/pkg/storegateway/parquet_bucket_stores_test.go b/pkg/storegateway/parquet_bucket_stores_test.go index abbc68e805e..99349bf2b86 100644 --- a/pkg/storegateway/parquet_bucket_stores_test.go +++ b/pkg/storegateway/parquet_bucket_stores_test.go @@ -295,7 +295,7 @@ func TestParquetBucketStores_Series_ShouldReturnErrorIfMaxInflightRequestIsReach bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) require.NoError(t, err) - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil) require.NoError(t, err) require.NoError(t, stores.InitialSync(context.Background())) @@ -320,7 +320,7 @@ func TestParquetBucketStores_Series_ShouldNotCheckMaxInflightRequestsIfTheLimitI bkt, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) require.NoError(t, err) - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bkt), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bkt), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil) require.NoError(t, err) require.NoError(t, stores.InitialSync(context.Background())) diff --git a/pkg/storegateway/tracking_bytes_limiter.go b/pkg/storegateway/tracking_bytes_limiter.go new file mode 100644 index 00000000000..f4a87c58917 --- /dev/null +++ b/pkg/storegateway/tracking_bytes_limiter.go @@ -0,0 +1,109 @@ +package storegateway + +import ( + "runtime" + "strconv" + "strings" + "sync" + "sync/atomic" + + "github.com/thanos-io/thanos/pkg/store" +) + +type trackingBytesLimiter struct { + inner store.BytesLimiter + tracker ConcurrentBytesTracker + tracked atomic.Uint64 + released atomic.Bool +} + +func newTrackingBytesLimiter(inner store.BytesLimiter, tracker ConcurrentBytesTracker) *trackingBytesLimiter { + return &trackingBytesLimiter{ + inner: inner, + tracker: tracker, + } +} + +func (t *trackingBytesLimiter) ReserveWithType(num uint64, dataType store.StoreDataType) error { + if err := t.inner.ReserveWithType(num, dataType); err != nil { + return err + } + + _ = t.tracker.Add(num) + _ = t.tracked.Add(num) + + return nil +} + +func (t *trackingBytesLimiter) Release() { + if !t.released.CompareAndSwap(false, true) { + return + } + + bytes := t.tracked.Load() + if bytes > 0 { + t.tracker.Release(bytes) + t.tracked.Store(0) + } +} + +func (t *trackingBytesLimiter) TrackedBytes() uint64 { + return t.tracked.Load() +} + +type trackingBytesLimiterRegistry struct { + mu sync.Mutex + limiters []*trackingBytesLimiter +} + +func newTrackingBytesLimiterRegistry() *trackingBytesLimiterRegistry { + return &trackingBytesLimiterRegistry{} +} + +func (r *trackingBytesLimiterRegistry) Register(limiter *trackingBytesLimiter) { + r.mu.Lock() + defer r.mu.Unlock() + r.limiters = append(r.limiters, limiter) +} + +func (r *trackingBytesLimiterRegistry) ReleaseAll() { + r.mu.Lock() + defer r.mu.Unlock() + + for _, limiter := range r.limiters { + limiter.Release() + } + r.limiters = nil +} + +type trackingLimiterRegistryHolder struct { + registries sync.Map +} + +func (h *trackingLimiterRegistryHolder) SetRegistry(registry *trackingBytesLimiterRegistry) { + h.registries.Store(getGoroutineID(), registry) +} + +func (h *trackingLimiterRegistryHolder) GetRegistry() *trackingBytesLimiterRegistry { + val, ok := h.registries.Load(getGoroutineID()) + if !ok { + return nil + } + return val.(*trackingBytesLimiterRegistry) +} + +func (h *trackingLimiterRegistryHolder) ClearRegistry() { + h.registries.Delete(getGoroutineID()) +} + +func getGoroutineID() int64 { + var buf [64]byte + n := runtime.Stack(buf[:], false) + // Stack output starts with "goroutine [" + s := strings.TrimPrefix(string(buf[:n]), "goroutine ") + if idx := strings.IndexByte(s, ' '); idx >= 0 { + s = s[:idx] + } + id, _ := strconv.ParseInt(s, 10, 64) + return id +} diff --git a/pkg/storegateway/tracking_bytes_limiter_test.go b/pkg/storegateway/tracking_bytes_limiter_test.go new file mode 100644 index 00000000000..c4be68cd459 --- /dev/null +++ b/pkg/storegateway/tracking_bytes_limiter_test.go @@ -0,0 +1,318 @@ +package storegateway + +import ( + "math/rand" + "testing" + "testing/quick" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/store" +) + +// mockBytesLimiter is a mock implementation of store.BytesLimiter for testing +type mockBytesLimiter struct { + reservedBytes uint64 + limit uint64 +} + +func newMockBytesLimiter(limit uint64) *mockBytesLimiter { + return &mockBytesLimiter{limit: limit} +} + +func (m *mockBytesLimiter) ReserveWithType(num uint64, _ store.StoreDataType) error { + m.reservedBytes += num + return nil +} + +func (m *mockBytesLimiter) Reserved() uint64 { + return m.reservedBytes +} + +func TestTrackingBytesLimiter_Basic(t *testing.T) { + t.Run("reserves bytes through inner limiter", func(t *testing.T) { + inner := newMockBytesLimiter(1000) + tracker := NewConcurrentBytesTracker(10000, nil) + limiter := newTrackingBytesLimiter(inner, tracker) + + err := limiter.ReserveWithType(100, store.PostingsFetched) + require.NoError(t, err) + + assert.Equal(t, uint64(100), inner.Reserved()) + }) + + t.Run("tracks bytes in concurrent tracker", func(t *testing.T) { + inner := newMockBytesLimiter(1000) + tracker := NewConcurrentBytesTracker(10000, nil) + limiter := newTrackingBytesLimiter(inner, tracker) + + err := limiter.ReserveWithType(100, store.PostingsFetched) + require.NoError(t, err) + + assert.Equal(t, uint64(100), tracker.Current()) + assert.Equal(t, uint64(100), limiter.TrackedBytes()) + }) + + t.Run("release decrements tracker", func(t *testing.T) { + inner := newMockBytesLimiter(1000) + tracker := NewConcurrentBytesTracker(10000, nil) + limiter := newTrackingBytesLimiter(inner, tracker) + + err := limiter.ReserveWithType(100, store.PostingsFetched) + require.NoError(t, err) + + limiter.Release() + + assert.Equal(t, uint64(0), tracker.Current()) + assert.Equal(t, uint64(0), limiter.TrackedBytes()) + }) + + t.Run("multiple reserves accumulate", func(t *testing.T) { + inner := newMockBytesLimiter(1000) + tracker := NewConcurrentBytesTracker(10000, nil) + limiter := newTrackingBytesLimiter(inner, tracker) + + err := limiter.ReserveWithType(100, store.PostingsFetched) + require.NoError(t, err) + err = limiter.ReserveWithType(200, store.SeriesFetched) + require.NoError(t, err) + err = limiter.ReserveWithType(300, store.ChunksFetched) + require.NoError(t, err) + + assert.Equal(t, uint64(600), tracker.Current()) + assert.Equal(t, uint64(600), limiter.TrackedBytes()) + }) +} + +// TestProperty_BytesLimiterIntegration tests Property 15: BytesLimiter integration +// **Feature: storegateway-max-data-limit, Property 15: BytesLimiter integration** +// **Validates: Requirements 5.2** +func TestProperty_BytesLimiterIntegration(t *testing.T) { + config := &quick.Config{ + MaxCount: 100, + } + + f := func(seed int64) bool { + rng := rand.New(rand.NewSource(seed)) + numReserves := rng.Intn(50) + 1 // 1 to 50 reserves + + inner := newMockBytesLimiter(uint64(100) * 1024 * 1024 * 1024) // 100GB limit + tracker := NewConcurrentBytesTracker(uint64(100)*1024*1024*1024, nil) + limiter := newTrackingBytesLimiter(inner, tracker) + + var totalBytes uint64 + + // Reserve random amounts of bytes + for range numReserves { + bytes := uint64(rng.Intn(1024*1024)) + 1 // 1 byte to 1MB + err := limiter.ReserveWithType(bytes, store.StoreDataType(rng.Intn(6))) + if err != nil { + return false + } + totalBytes += bytes + } + + // Verify concurrent tracker was updated with the same total amount + if tracker.Current() != totalBytes { + return false + } + + // Verify inner limiter received the same total amount + if inner.Reserved() != totalBytes { + return false + } + + // Verify tracked bytes matches + if limiter.TrackedBytes() != totalBytes { + return false + } + + // Release and verify cleanup + limiter.Release() + return tracker.Current() == 0 && limiter.TrackedBytes() == 0 + } + + if err := quick.Check(f, config); err != nil { + t.Error(err) + } +} + +// TestProperty_AllDataTypesTracked tests Property 16: All data types tracked +// **Feature: storegateway-max-data-limit, Property 16: All data types tracked** +// **Validates: Requirements 5.3** +func TestProperty_AllDataTypesTracked(t *testing.T) { + config := &quick.Config{ + MaxCount: 100, + } + + // All StoreDataType values from Thanos + dataTypes := []store.StoreDataType{ + store.PostingsFetched, + store.PostingsTouched, + store.SeriesFetched, + store.SeriesTouched, + store.ChunksFetched, + store.ChunksTouched, + } + + f := func(seed int64) bool { + rng := rand.New(rand.NewSource(seed)) + + inner := newMockBytesLimiter(uint64(100) * 1024 * 1024 * 1024) + tracker := NewConcurrentBytesTracker(uint64(100)*1024*1024*1024, nil) + limiter := newTrackingBytesLimiter(inner, tracker) + + var totalBytes uint64 + + // Reserve bytes for each data type + for _, dataType := range dataTypes { + bytes := uint64(rng.Intn(1024*1024)) + 1 // 1 byte to 1MB + err := limiter.ReserveWithType(bytes, dataType) + if err != nil { + return false + } + totalBytes += bytes + } + + // Verify all data types were tracked consistently + if tracker.Current() != totalBytes { + return false + } + + if limiter.TrackedBytes() != totalBytes { + return false + } + + // Release and verify cleanup + limiter.Release() + return tracker.Current() == 0 + } + + if err := quick.Check(f, config); err != nil { + t.Error(err) + } +} + +func TestTrackingBytesLimiter_ReleaseIdempotent(t *testing.T) { + inner := newMockBytesLimiter(1000) + tracker := NewConcurrentBytesTracker(10000, nil) + limiter := newTrackingBytesLimiter(inner, tracker) + + err := limiter.ReserveWithType(100, store.PostingsFetched) + require.NoError(t, err) + + // First release + limiter.Release() + assert.Equal(t, uint64(0), tracker.Current()) + + // Second release should be safe (idempotent) + limiter.Release() + assert.Equal(t, uint64(0), tracker.Current()) +} + +func TestTrackingBytesLimiter_WithLimitingDisabled(t *testing.T) { + inner := newMockBytesLimiter(1000) + tracker := NewConcurrentBytesTracker(0, nil) // 0 limit = limiting disabled but tracking enabled + limiter := newTrackingBytesLimiter(inner, tracker) + + err := limiter.ReserveWithType(100, store.PostingsFetched) + require.NoError(t, err) + + // Inner limiter should still work + assert.Equal(t, uint64(100), inner.Reserved()) + + // Tracker should track bytes even when limiting is disabled + assert.Equal(t, uint64(100), tracker.Current()) + + // Release should work correctly + limiter.Release() + assert.Equal(t, uint64(0), tracker.Current()) +} + +func TestTrackingBytesLimiter_PanicRecovery(t *testing.T) { + t.Run("defer release cleans up on panic with recover", func(t *testing.T) { + inner := newMockBytesLimiter(1000) + tracker := NewConcurrentBytesTracker(10000, nil) + limiter := newTrackingBytesLimiter(inner, tracker) + + // Use a wrapper function to recover from panic + func() { + defer func() { + if r := recover(); r != nil { + // Panic was recovered, verify cleanup happened + assert.Equal(t, uint64(0), tracker.Current(), "bytes should be released after panic recovery") + } + }() + + defer limiter.Release() + + err := limiter.ReserveWithType(100, store.PostingsFetched) + require.NoError(t, err) + + // Verify bytes are tracked before panic + assert.Equal(t, uint64(100), tracker.Current()) + + // Simulate panic + panic("simulated panic") + }() + + // Verify cleanup happened + assert.Equal(t, uint64(0), tracker.Current()) + }) +} + +func TestTrackingBytesLimiterRegistry(t *testing.T) { + t.Run("registry releases all limiters", func(t *testing.T) { + tracker := NewConcurrentBytesTracker(10000, nil) + registry := newTrackingBytesLimiterRegistry() + + // Create multiple limiters and register them + limiter1 := newTrackingBytesLimiter(newMockBytesLimiter(1000), tracker) + limiter2 := newTrackingBytesLimiter(newMockBytesLimiter(1000), tracker) + limiter3 := newTrackingBytesLimiter(newMockBytesLimiter(1000), tracker) + + registry.Register(limiter1) + registry.Register(limiter2) + registry.Register(limiter3) + + // Reserve bytes through each limiter + limiter1.ReserveWithType(100, store.PostingsFetched) + limiter2.ReserveWithType(200, store.SeriesFetched) + limiter3.ReserveWithType(300, store.ChunksFetched) + + // Verify total bytes tracked + assert.Equal(t, uint64(600), tracker.Current()) + + // Release all through registry + registry.ReleaseAll() + + // Verify all bytes released + assert.Equal(t, uint64(0), tracker.Current()) + }) + + t.Run("registry handles panic recovery", func(t *testing.T) { + tracker := NewConcurrentBytesTracker(10000, nil) + registry := newTrackingBytesLimiterRegistry() + + func() { + defer func() { + if r := recover(); r != nil { + // Panic was recovered + } + }() + + defer registry.ReleaseAll() + + limiter := newTrackingBytesLimiter(newMockBytesLimiter(1000), tracker) + registry.Register(limiter) + + limiter.ReserveWithType(100, store.PostingsFetched) + assert.Equal(t, uint64(100), tracker.Current()) + + panic("simulated panic") + }() + + // Verify cleanup happened + assert.Equal(t, uint64(0), tracker.Current()) + }) +} From d63b174a5066fe1e080f29968585f11dbf057c5a Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Thu, 19 Feb 2026 10:58:06 -0800 Subject: [PATCH 2/3] Cancel current queries instead of preventing new ones from entering Signed-off-by: Essam Eldaly --- pkg/storegateway/concurrent_bytes_tracker.go | 38 +- .../concurrent_bytes_tracker_test.go | 1170 +++-------------- pkg/storegateway/gateway.go | 3 - pkg/storegateway/tracking_bytes_limiter.go | 4 +- .../tracking_bytes_limiter_test.go | 304 ++--- 5 files changed, 259 insertions(+), 1260 deletions(-) diff --git a/pkg/storegateway/concurrent_bytes_tracker.go b/pkg/storegateway/concurrent_bytes_tracker.go index b614c8a04ad..1908949ee3f 100644 --- a/pkg/storegateway/concurrent_bytes_tracker.go +++ b/pkg/storegateway/concurrent_bytes_tracker.go @@ -1,21 +1,17 @@ package storegateway import ( - "context" - "net/http" "sync/atomic" "time" - "github.com/cortexproject/cortex/pkg/util/requestmeta" "github.com/prometheus/client_golang/prometheus" - "github.com/weaveworks/common/httpgrpc" + "github.com/thanos-io/thanos/pkg/pool" ) const peakResetInterval = 30 * time.Second type ConcurrentBytesTracker interface { - TryAccept(ctx context.Context) error - Add(bytes uint64) func() + Add(bytes uint64) error Release(bytes uint64) Current() uint64 Stop() @@ -62,31 +58,11 @@ func NewConcurrentBytesTracker(maxConcurrentBytes uint64, reg prometheus.Registe return tracker } -func (t *concurrentBytesTracker) TryAccept(ctx context.Context) error { - if ctx != nil { - select { - case <-ctx.Done(): - return ctx.Err() - default: - // Context is not cancelled, continue - } - } - - if t.maxConcurrentBytes == 0 { - return nil - } - - current := t.currentBytes.Load() - if current >= t.maxConcurrentBytes { - t.rejectedRequestsTotal.Inc() - reqID := requestmeta.RequestIdFromContext(ctx) - return httpgrpc.Errorf(http.StatusServiceUnavailable, - "concurrent bytes limit reached: current=%d, max=%d, request_id=%s", current, t.maxConcurrentBytes, reqID) +func (t *concurrentBytesTracker) Add(bytes uint64) error { + if t.maxConcurrentBytes > 0 && t.Current()+bytes > t.maxConcurrentBytes { + return pool.ErrPoolExhausted } - return nil -} -func (t *concurrentBytesTracker) Add(bytes uint64) func() { newValue := t.currentBytes.Add(bytes) for { peak := t.peakBytes.Load() @@ -99,9 +75,7 @@ func (t *concurrentBytesTracker) Add(bytes uint64) func() { // CAS failed, retry } - return func() { - t.Release(bytes) - } + return nil } func (t *concurrentBytesTracker) Release(bytes uint64) { diff --git a/pkg/storegateway/concurrent_bytes_tracker_test.go b/pkg/storegateway/concurrent_bytes_tracker_test.go index 685f32c08c2..ae40b6ab8df 100644 --- a/pkg/storegateway/concurrent_bytes_tracker_test.go +++ b/pkg/storegateway/concurrent_bytes_tracker_test.go @@ -1,16 +1,15 @@ package storegateway import ( - "context" "math/rand" "sync" "testing" "testing/quick" "github.com/prometheus/client_golang/prometheus" - dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/pool" "github.com/thanos-io/thanos/pkg/store" ) @@ -19,1231 +18,358 @@ func TestConcurrentBytesTracker_Basic(t *testing.T) { tracker := NewConcurrentBytesTracker(1000, nil) assert.Equal(t, uint64(0), tracker.Current()) - release := tracker.Add(100) + require.NoError(t, tracker.Add(100)) assert.Equal(t, uint64(100), tracker.Current()) - release() + tracker.Release(100) assert.Equal(t, uint64(0), tracker.Current()) }) - t.Run("try accept rejects when at limit", func(t *testing.T) { + t.Run("add rejects when would exceed limit", func(t *testing.T) { tracker := NewConcurrentBytesTracker(100, nil) + require.NoError(t, tracker.Add(100)) - // Add bytes to reach the limit - tracker.Add(100) - - // Should reject new requests - err := tracker.TryAccept(context.Background()) - assert.Error(t, err) - assert.Contains(t, err.Error(), "concurrent bytes limit reached") + err := tracker.Add(1) + assert.ErrorIs(t, err, pool.ErrPoolExhausted) + assert.Equal(t, uint64(100), tracker.Current()) }) - t.Run("try accept allows when below limit", func(t *testing.T) { + t.Run("add allows when at exactly limit", func(t *testing.T) { tracker := NewConcurrentBytesTracker(100, nil) - - // Add bytes below the limit - tracker.Add(50) - - // Should accept new requests - err := tracker.TryAccept(context.Background()) - assert.NoError(t, err) + require.NoError(t, tracker.Add(50)) + require.NoError(t, tracker.Add(50)) + assert.Equal(t, uint64(100), tracker.Current()) }) } func TestTrackerWithLimitingDisabled(t *testing.T) { - tracker := NewConcurrentBytesTracker(0, nil) - - t.Run("try accept always succeeds when limiting disabled", func(t *testing.T) { - err := tracker.TryAccept(context.Background()) - assert.NoError(t, err) - }) - - t.Run("current returns actual tracked bytes", func(t *testing.T) { - // Create a fresh tracker for this test - tr := NewConcurrentBytesTracker(0, nil) - assert.Equal(t, uint64(0), tr.Current()) - - // Add bytes and verify they are tracked - release := tr.Add(1000) - assert.Equal(t, uint64(1000), tr.Current()) - - // Release and verify counter decrements - release() - assert.Equal(t, uint64(0), tr.Current()) + t.Run("add always succeeds", func(t *testing.T) { + tracker := NewConcurrentBytesTracker(0, nil) + require.NoError(t, tracker.Add(1000)) + assert.Equal(t, uint64(1000), tracker.Current()) + tracker.Release(1000) + assert.Equal(t, uint64(0), tracker.Current()) }) - t.Run("add tracks bytes even without limit", func(t *testing.T) { - // Create a fresh tracker for this test - tr := NewConcurrentBytesTracker(0, nil) - release := tr.Add(1000) - assert.Equal(t, uint64(1000), tr.Current()) - release() - assert.Equal(t, uint64(0), tr.Current()) - }) - - t.Run("try accept succeeds even with high byte count", func(t *testing.T) { - // Create a fresh tracker for this test - tr := NewConcurrentBytesTracker(0, nil) - // Add a large amount of bytes - tr.Add(uint64(100) * 1024 * 1024 * 1024) // 100GB - - // TryAccept should still succeed because limiting is disabled - err := tr.TryAccept(context.Background()) - assert.NoError(t, err) + t.Run("add succeeds even with very high byte count", func(t *testing.T) { + tracker := NewConcurrentBytesTracker(0, nil) + assert.NoError(t, tracker.Add(uint64(100)*1024*1024*1024)) }) } -// Property-based tests using testing/quick - -// TestProperty_AddIncrementsCounter tests Property 1: Add increments counter correctly -// **Feature: storegateway-max-data-limit, Property 1: Add increments counter correctly** -// **Validates: Requirements 1.1, 5.1** -func TestProperty_AddIncrementsCounter(t *testing.T) { - config := &quick.Config{ - MaxCount: 100, - } - - f := func(bytes uint64) bool { - // Limit bytes to reasonable range (1 byte to 1GB) - bytes = (bytes % (1024 * 1024 * 1024)) + 1 - - tracker := NewConcurrentBytesTracker(uint64(10)*1024*1024*1024, nil) // 10GB limit - initialValue := tracker.Current() +func TestConcurrentBytesTracker_Metrics(t *testing.T) { + reg := prometheus.NewRegistry() + tracker := NewConcurrentBytesTracker(1000, reg) + require.NoError(t, tracker.Add(500)) - tracker.Add(bytes) - newValue := tracker.Current() + metricFamilies, err := reg.Gather() + require.NoError(t, err) - return newValue == initialValue+bytes + metricNames := make(map[string]bool) + for _, mf := range metricFamilies { + metricNames[mf.GetName()] = true } - if err := quick.Check(f, config); err != nil { - t.Error(err) - } -} + assert.True(t, metricNames["cortex_storegateway_concurrent_bytes_peak"]) + assert.True(t, metricNames["cortex_storegateway_concurrent_bytes_max"]) + assert.True(t, metricNames["cortex_storegateway_bytes_limiter_rejected_requests_total"]) -// TestProperty_ReleaseRoundTrip tests Property 2: Release round-trip restores counter -// **Feature: storegateway-max-data-limit, Property 2: Release round-trip restores counter** -// **Validates: Requirements 1.2** -func TestProperty_ReleaseRoundTrip(t *testing.T) { - config := &quick.Config{ - MaxCount: 100, - } + tracker.Release(500) +} +func TestProperty_AddIncrementsCounter(t *testing.T) { f := func(bytes uint64) bool { - // Limit bytes to reasonable range bytes = (bytes % (1024 * 1024 * 1024)) + 1 - tracker := NewConcurrentBytesTracker(uint64(10)*1024*1024*1024, nil) - initialValue := tracker.Current() - - release := tracker.Add(bytes) - release() - - return tracker.Current() == initialValue - } - if err := quick.Check(f, config); err != nil { - t.Error(err) + err := tracker.Add(bytes) + if err != nil { + return false + } + return tracker.Current() == bytes } + require.NoError(t, quick.Check(f, &quick.Config{MaxCount: 100})) } -// TestProperty_CancellationReleasesBytes tests Property 3: Cancellation releases bytes -// **Feature: storegateway-max-data-limit, Property 3: Cancellation releases bytes** -// **Validates: Requirements 1.3** -func TestProperty_CancellationReleasesBytes(t *testing.T) { - config := &quick.Config{ - MaxCount: 100, - } - +func TestProperty_ReleaseRoundTrip(t *testing.T) { f := func(bytes uint64) bool { - // Limit bytes to reasonable range bytes = (bytes % (1024 * 1024 * 1024)) + 1 - tracker := NewConcurrentBytesTracker(uint64(10)*1024*1024*1024, nil) - // Simulate a request that tracks bytes - release := tracker.Add(bytes) - afterAdd := tracker.Current() - - // Simulate cancellation by calling release - release() - afterRelease := tracker.Current() - - return afterAdd == bytes && afterRelease == 0 - } - - if err := quick.Check(f, config); err != nil { - t.Error(err) + _ = tracker.Add(bytes) + tracker.Release(bytes) + return tracker.Current() == 0 } + require.NoError(t, quick.Check(f, &quick.Config{MaxCount: 100})) } -// TestProperty_ThreadSafeCounterUpdates tests Property 14: Thread-safe counter updates -// **Feature: storegateway-max-data-limit, Property 14: Thread-safe counter updates** -// **Validates: Requirements 4.5** func TestProperty_ThreadSafeCounterUpdates(t *testing.T) { - config := &quick.Config{ - MaxCount: 100, - } - f := func(seed int64) bool { rng := rand.New(rand.NewSource(seed)) - numOps := rng.Intn(100) + 1 // 1 to 100 operations - bytesPerOp := uint64(rng.Intn(1024*1024)) + 1 // 1 byte to 1MB + numOps := rng.Intn(100) + 1 + bytesPerOp := uint64(rng.Intn(1024*1024)) + 1 tracker := NewConcurrentBytesTracker(uint64(100)*1024*1024*1024, nil) var wg sync.WaitGroup - releases := make([]func(), numOps) - - // Concurrently add bytes - for i := range numOps { + for range numOps { wg.Add(1) - go func(idx int) { + go func() { defer wg.Done() - releases[idx] = tracker.Add(bytesPerOp) - }(i) + _ = tracker.Add(bytesPerOp) + }() } wg.Wait() - // Verify all adds were counted - expectedAfterAdds := uint64(numOps) * bytesPerOp - if tracker.Current() != expectedAfterAdds { + if tracker.Current() != uint64(numOps)*bytesPerOp { return false } - // Concurrently release bytes - for i := range numOps { + for range numOps { wg.Add(1) - go func(idx int) { + go func() { defer wg.Done() - releases[idx]() - }(i) + tracker.Release(bytesPerOp) + }() } wg.Wait() - // Final value should be 0 return tracker.Current() == 0 } - - if err := quick.Check(f, config); err != nil { - t.Error(err) - } -} - -func TestConcurrentBytesTracker_Metrics(t *testing.T) { - reg := prometheus.NewRegistry() - tracker := NewConcurrentBytesTracker(1000, reg) - - // Add some bytes - release := tracker.Add(500) - - // Check metrics are registered - metricFamilies, err := reg.Gather() - require.NoError(t, err) - - metricNames := make(map[string]bool) - for _, mf := range metricFamilies { - metricNames[mf.GetName()] = true - } - - assert.True(t, metricNames["cortex_storegateway_concurrent_bytes_peak"]) - assert.True(t, metricNames["cortex_storegateway_concurrent_bytes_max"]) - assert.True(t, metricNames["cortex_storegateway_bytes_limiter_rejected_requests_total"]) - - release() -} - -func TestConcurrentBytesTracker_ContextCancellation(t *testing.T) { - t.Run("cancelled context returns error", func(t *testing.T) { - tracker := NewConcurrentBytesTracker(1000, nil) - - // Create a cancelled context - ctx, cancel := context.WithCancel(context.Background()) - cancel() - - // TryAccept should return context error - err := tracker.TryAccept(ctx) - assert.Error(t, err) - assert.Equal(t, context.Canceled, err) - }) - - t.Run("non-cancelled context proceeds normally", func(t *testing.T) { - tracker := NewConcurrentBytesTracker(1000, nil) - - // Create a non-cancelled context - ctx := context.Background() - - // TryAccept should succeed - err := tracker.TryAccept(ctx) - assert.NoError(t, err) - }) - - t.Run("nil context proceeds normally", func(t *testing.T) { - tracker := NewConcurrentBytesTracker(1000, nil) - - // TryAccept with nil context should succeed - err := tracker.TryAccept(nil) - assert.NoError(t, err) - }) - - t.Run("tracker with limiting disabled still checks context cancellation", func(t *testing.T) { - tracker := NewConcurrentBytesTracker(0, nil) // 0 = limiting disabled - - // Create a cancelled context - ctx, cancel := context.WithCancel(context.Background()) - cancel() - - // Tracker should still check context cancellation even when limiting is disabled - err := tracker.TryAccept(ctx) - assert.Error(t, err) - assert.Equal(t, context.Canceled, err) - }) + require.NoError(t, quick.Check(f, &quick.Config{MaxCount: 100})) } -// TestProperty_CurrentBytesMetricReflectsCounter tests Property 4: Current bytes metric reflects counter -// **Feature: storegateway-max-data-limit, Property 4: Current bytes metric reflects counter** -// **Validates: Requirements 1.4, 7.1** func TestProperty_PeakBytesMetricTracksPeak(t *testing.T) { - config := &quick.Config{ - MaxCount: 100, - } - f := func(seed int64) bool { rng := rand.New(rand.NewSource(seed)) - numOps := rng.Intn(50) + 1 // 1 to 50 operations + numOps := rng.Intn(50) + 1 reg := prometheus.NewRegistry() tracker := NewConcurrentBytesTracker(uint64(100)*1024*1024*1024, reg).(*concurrentBytesTracker) - var releases []func() + var totalBytes uint64 var maxSeen uint64 - // Perform random add operations and track the expected peak for range numOps { - bytes := uint64(rng.Intn(1024*1024)) + 1 // 1 byte to 1MB - releases = append(releases, tracker.Add(bytes)) - - current := tracker.Current() - if current > maxSeen { + bytes := uint64(rng.Intn(1024*1024)) + 1 + _ = tracker.Add(bytes) + totalBytes += bytes + if current := tracker.Current(); current > maxSeen { maxSeen = current } } - // The internal peakBytes atomic should match the highest value we observed - peakValue := tracker.peakBytes.Load() - if peakValue < maxSeen { - return false - } - - // Release all bytes - for _, release := range releases { - release() - } - - // After releasing everything, current should be 0 but peak should still - // reflect the high-watermark (peakBytes is not reset by releases). - if tracker.Current() != 0 { - return false - } if tracker.peakBytes.Load() < maxSeen { return false } - return true - } - - if err := quick.Check(f, config); err != nil { - t.Error(err) + tracker.Release(totalBytes) + return tracker.Current() == 0 && tracker.peakBytes.Load() >= maxSeen } + require.NoError(t, quick.Check(f, &quick.Config{MaxCount: 100})) } -// getGaugeValue extracts the current value from a prometheus Gauge -func getGaugeValue(gauge prometheus.Gauge) float64 { - ch := make(chan prometheus.Metric, 1) - gauge.Collect(ch) - m := <-ch - var metric dto.Metric - m.Write(&metric) - return metric.GetGauge().GetValue() -} - -// TestProperty_PositiveLimitEnforcement tests Property 5: Positive limit enforcement -// **Feature: storegateway-max-data-limit, Property 5: Positive limit enforcement** -// **Validates: Requirements 2.3, 3.1** func TestProperty_PositiveLimitEnforcement(t *testing.T) { - config := &quick.Config{ - MaxCount: 100, - } - f := func(limit uint64, bytesToAdd uint64) bool { - // Ensure limit is positive and reasonable (1 byte to 10GB) limit = (limit % (10 * 1024 * 1024 * 1024)) + 1 - // Ensure bytesToAdd is at least equal to limit to trigger rejection - bytesToAdd = limit + (bytesToAdd % (1024 * 1024 * 1024)) + bytesToAdd = limit + (bytesToAdd % (1024 * 1024 * 1024)) + 1 tracker := NewConcurrentBytesTracker(limit, nil) - - // Add bytes to reach or exceed the limit - tracker.Add(bytesToAdd) - - // TryAccept should reject when at or above limit - err := tracker.TryAccept(context.Background()) + err := tracker.Add(bytesToAdd) return err != nil } - - if err := quick.Check(f, config); err != nil { - t.Error(err) - } + require.NoError(t, quick.Check(f, &quick.Config{MaxCount: 100})) } -// TestProperty_BelowLimitAccepts tests that requests are accepted when below limit -// **Feature: storegateway-max-data-limit, Property 5: Positive limit enforcement (complement)** -// **Validates: Requirements 2.3, 3.1** func TestProperty_BelowLimitAccepts(t *testing.T) { - config := &quick.Config{ - MaxCount: 100, - } - f := func(limit uint64, bytesToAdd uint64) bool { - // Ensure limit is positive and reasonable (1KB to 10GB) limit = (limit % (10 * 1024 * 1024 * 1024)) + 1024 - // Ensure bytesToAdd is strictly less than limit - bytesToAdd = bytesToAdd % limit + bytesToAdd = bytesToAdd % (limit + 1) tracker := NewConcurrentBytesTracker(limit, nil) - - // Add bytes below the limit - tracker.Add(bytesToAdd) - - // TryAccept should accept when below limit - err := tracker.TryAccept(context.Background()) - return err == nil - } - - if err := quick.Check(f, config); err != nil { - t.Error(err) + return tracker.Add(bytesToAdd) == nil } + require.NoError(t, quick.Check(f, &quick.Config{MaxCount: 100})) } -// TestProperty_RejectionIncrementsMetric tests Property 9: Rejection increments metric -// **Feature: storegateway-max-data-limit, Property 9: Rejection increments metric** -// **Validates: Requirements 3.4** -func TestProperty_RejectionIncrementsMetric(t *testing.T) { - config := &quick.Config{ - MaxCount: 100, - } - +func TestProperty_RejectionDoesNotCountBytes(t *testing.T) { f := func(limit uint64, numRejections uint8) bool { - // Ensure limit is positive and reasonable (1KB to 1GB) limit = (limit % (1024 * 1024 * 1024)) + 1024 - // Ensure at least 1 rejection attempt numRejections = (numRejections % 10) + 1 - reg := prometheus.NewRegistry() - tracker := NewConcurrentBytesTracker(limit, reg).(*concurrentBytesTracker) - - // Fill up to the limit - tracker.Add(limit) - - // Get initial rejection count - initialRejections := getCounterValue(tracker.rejectedRequestsTotal) - - // Attempt multiple rejections - for i := uint8(0); i < numRejections; i++ { - err := tracker.TryAccept(context.Background()) - if err == nil { - return false // Should have been rejected - } - } - - // Verify rejection counter increased by the number of rejections - finalRejections := getCounterValue(tracker.rejectedRequestsTotal) - return finalRejections == initialRejections+float64(numRejections) - } - - if err := quick.Check(f, config); err != nil { - t.Error(err) - } -} - -// getCounterValue extracts the current value from a prometheus Counter -func getCounterValue(counter prometheus.Counter) float64 { - ch := make(chan prometheus.Metric, 1) - counter.Collect(ch) - m := <-ch - var metric dto.Metric - m.Write(&metric) - return metric.GetCounter().GetValue() -} - -// TestProperty_RejectionReturns503Error tests Property 7: Rejection returns 503 error -// **Feature: storegateway-max-data-limit, Property 7: Rejection returns 503 error** -// **Validates: Requirements 3.2** -func TestProperty_RejectionReturns503Error(t *testing.T) { - config := &quick.Config{ - MaxCount: 100, - } - - f := func(limit uint64, bytesToAdd uint64) bool { - // Ensure limit is positive and reasonable (1 byte to 1GB) - limit = (limit % (1024 * 1024 * 1024)) + 1 - // Ensure bytesToAdd is at least equal to limit to trigger rejection - bytesToAdd = limit + (bytesToAdd % (1024 * 1024 * 1024)) - tracker := NewConcurrentBytesTracker(limit, nil) - - // Add bytes to reach or exceed the limit - tracker.Add(bytesToAdd) - - // TryAccept should reject with 503 error - err := tracker.TryAccept(context.Background()) - if err == nil { + if err := tracker.Add(limit); err != nil { return false } - // Check that the error contains HTTP 503 status code - // The error is created using httpgrpc.Errorf(http.StatusServiceUnavailable, ...) - // which embeds the status code in the error - errStr := err.Error() - // The httpgrpc error format includes the status code - return errStr != "" // Error should exist - } + for i := uint8(0); i < numRejections; i++ { + if tracker.Add(1) == nil { + return false + } + } - if err := quick.Check(f, config); err != nil { - t.Error(err) + return tracker.Current() == limit } + require.NoError(t, quick.Check(f, &quick.Config{MaxCount: 100})) } -// TestProperty_RejectionErrorMessageContainsReason tests Property 8: Rejection error message contains reason -// **Feature: storegateway-max-data-limit, Property 8: Rejection error message contains reason** -// **Validates: Requirements 3.3** -func TestProperty_RejectionErrorMessageContainsReason(t *testing.T) { - config := &quick.Config{ - MaxCount: 100, - } - +func TestProperty_RejectionReturnsPoolExhausted(t *testing.T) { f := func(limit uint64, bytesToAdd uint64) bool { - // Ensure limit is positive and reasonable (1 byte to 1GB) limit = (limit % (1024 * 1024 * 1024)) + 1 - // Ensure bytesToAdd is at least equal to limit to trigger rejection - bytesToAdd = limit + (bytesToAdd % (1024 * 1024 * 1024)) + bytesToAdd = limit + (bytesToAdd % (1024 * 1024 * 1024)) + 1 tracker := NewConcurrentBytesTracker(limit, nil) - - // Add bytes to reach or exceed the limit - tracker.Add(bytesToAdd) - - // TryAccept should reject with error message containing reason - err := tracker.TryAccept(context.Background()) - if err == nil { - return false - } - - // Check that the error message contains information about bytes limit - errStr := err.Error() - containsBytesLimit := contains(errStr, "concurrent bytes limit") || - contains(errStr, "bytes limit") || - contains(errStr, "limit reached") - - return containsBytesLimit - } - - if err := quick.Check(f, config); err != nil { - t.Error(err) - } -} - -// contains checks if a string contains a substring (case-insensitive) -func contains(s, substr string) bool { - return len(s) >= len(substr) && (s == substr || len(substr) == 0 || - (len(s) > 0 && len(substr) > 0 && findSubstring(s, substr))) -} - -func findSubstring(s, substr string) bool { - for i := 0; i <= len(s)-len(substr); i++ { - if s[i:i+len(substr)] == substr { - return true - } + return tracker.Add(bytesToAdd) == pool.ErrPoolExhausted } - return false + require.NoError(t, quick.Check(f, &quick.Config{MaxCount: 100})) } -// TestProperty_RecoveryAfterRelease tests Property 10: Recovery after release -// **Feature: storegateway-max-data-limit, Property 10: Recovery after release** -// **Validates: Requirements 3.5** func TestProperty_RecoveryAfterRelease(t *testing.T) { - config := &quick.Config{ - MaxCount: 100, - } - f := func(limit uint64, bytesToAdd uint64) bool { - // Ensure limit is positive and reasonable (1KB to 1GB) limit = (limit % (1024 * 1024 * 1024)) + 1024 - // Ensure bytesToAdd is at least equal to limit to trigger rejection - bytesToAdd = limit + (bytesToAdd % (1024 * 1024 * 1024)) + bytesToAdd = (bytesToAdd % limit) + 1 tracker := NewConcurrentBytesTracker(limit, nil) - - // Add bytes to reach or exceed the limit - release := tracker.Add(bytesToAdd) - - // TryAccept should reject when at capacity - err := tracker.TryAccept(context.Background()) - if err == nil { - return false // Should have been rejected - } - - // Release the bytes - release() - - // TryAccept should now accept since we're below the limit - err = tracker.TryAccept(context.Background()) - return err == nil - } - - if err := quick.Check(f, config); err != nil { - t.Error(err) - } -} - -// TestIntegration_ResourceLimiterPrecedesBytesLimiter verifies that the resource-based limiter -// is checked before the bytes limiter in the Series method. -// This test validates Requirements 4.3 and 4.4. -func TestIntegration_ResourceLimiterPrecedesBytesLimiter(t *testing.T) { - t.Run("resource limiter rejection does not increment bytes counter", func(t *testing.T) { - // Create a tracker with a high limit (should not reject) - tracker := NewConcurrentBytesTracker(uint64(100)*1024*1024*1024, nil) - - // Verify initial state - assert.Equal(t, uint64(0), tracker.Current()) - - // Simulate the ordering: resource limiter rejects first - // In the actual Series method, checkResourceUtilization() is called before TryAccept() - // If resource limiter rejects, TryAccept() is never called, so bytes counter stays at 0 - - // This test verifies the expected behavior: when resource limiter rejects, - // the bytes counter should not be incremented - resourceLimiterRejected := true // Simulating resource limiter rejection - - if !resourceLimiterRejected { - // Only call TryAccept if resource limiter accepts - err := tracker.TryAccept(context.Background()) - assert.NoError(t, err) - } - - // Bytes counter should remain at 0 because resource limiter rejected first - assert.Equal(t, uint64(0), tracker.Current()) - }) - - t.Run("bytes limiter checked after resource limiter accepts", func(t *testing.T) { - // Create a tracker with a low limit (should reject) - tracker := NewConcurrentBytesTracker(100, nil) - - // Fill up the tracker to trigger rejection - tracker.Add(100) - - // Simulate the ordering: resource limiter accepts, then bytes limiter is checked - resourceLimiterRejected := false // Simulating resource limiter acceptance - - var bytesLimiterErr error - if !resourceLimiterRejected { - // Only call TryAccept if resource limiter accepts - bytesLimiterErr = tracker.TryAccept(context.Background()) - } - - // Bytes limiter should reject because we're at capacity - assert.Error(t, bytesLimiterErr) - assert.Contains(t, bytesLimiterErr.Error(), "concurrent bytes limit reached") - }) -} - -// TestIntegration_OrderingVerification tests that the ordering of limiters is correct -// by simulating the Series method flow. -func TestIntegration_OrderingVerification(t *testing.T) { - type limiterResult struct { - resourceLimiterCalled bool - bytesLimiterCalled bool - resourceLimiterErr error - bytesLimiterErr error - } - - tests := []struct { - name string - resourceLimiterRejects bool - bytesLimiterRejects bool - expectedResult limiterResult - }{ - { - name: "both limiters accept", - resourceLimiterRejects: false, - bytesLimiterRejects: false, - expectedResult: limiterResult{ - resourceLimiterCalled: true, - bytesLimiterCalled: true, - resourceLimiterErr: nil, - bytesLimiterErr: nil, - }, - }, - { - name: "resource limiter rejects - bytes limiter not called", - resourceLimiterRejects: true, - bytesLimiterRejects: false, - expectedResult: limiterResult{ - resourceLimiterCalled: true, - bytesLimiterCalled: false, - resourceLimiterErr: assert.AnError, - bytesLimiterErr: nil, - }, - }, - { - name: "resource limiter accepts - bytes limiter rejects", - resourceLimiterRejects: false, - bytesLimiterRejects: true, - expectedResult: limiterResult{ - resourceLimiterCalled: true, - bytesLimiterCalled: true, - resourceLimiterErr: nil, - bytesLimiterErr: assert.AnError, - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - var result limiterResult - - // Simulate checkResourceUtilization - result.resourceLimiterCalled = true - if tc.resourceLimiterRejects { - result.resourceLimiterErr = assert.AnError - } - - // Only check bytes limiter if resource limiter accepts (simulating Series method flow) - if result.resourceLimiterErr == nil { - result.bytesLimiterCalled = true - if tc.bytesLimiterRejects { - result.bytesLimiterErr = assert.AnError - } - } - - assert.Equal(t, tc.expectedResult.resourceLimiterCalled, result.resourceLimiterCalled) - assert.Equal(t, tc.expectedResult.bytesLimiterCalled, result.bytesLimiterCalled) - - if tc.expectedResult.resourceLimiterErr != nil { - assert.Error(t, result.resourceLimiterErr) - } else { - assert.NoError(t, result.resourceLimiterErr) - } - - if tc.expectedResult.bytesLimiterErr != nil { - assert.Error(t, result.bytesLimiterErr) - } else { - assert.NoError(t, result.bytesLimiterErr) - } - }) - } -} - -// TestIntegration_RequestTrackerIntegration verifies that the request tracker is called -// after the bytes limiter accepts a request, and not called for rejected requests. -// This test validates Requirements 4.1 and 4.2. -func TestIntegration_RequestTrackerIntegration(t *testing.T) { - t.Run("request tracker called after bytes limiter accepts", func(t *testing.T) { - // Create a tracker with a high limit (should accept) - tracker := NewConcurrentBytesTracker(uint64(100)*1024*1024*1024, nil) - - // Simulate the flow in gateway.go Series method: - // 1. checkResourceUtilization() - assume it passes - // 2. concurrentBytesTracker.TryAccept() - // 3. g.stores.Series() - which internally checks inflightRequests - - requestTrackerCalled := false - - // Step 2: Bytes limiter check - err := tracker.TryAccept(context.Background()) - require.NoError(t, err) - - // Step 3: Only if bytes limiter accepts, request tracker is called - // (simulating the flow in bucket_stores.go Series method) - if err == nil { - requestTrackerCalled = true - } - - assert.True(t, requestTrackerCalled, "request tracker should be called after bytes limiter accepts") - }) - - t.Run("request tracker not called when bytes limiter rejects", func(t *testing.T) { - // Create a tracker with a low limit (should reject) - tracker := NewConcurrentBytesTracker(100, nil) - - // Fill up the tracker to trigger rejection - tracker.Add(100) - - requestTrackerCalled := false - - // Bytes limiter check - should reject - err := tracker.TryAccept(context.Background()) - require.Error(t, err) - - // Request tracker should NOT be called because bytes limiter rejected - if err == nil { - requestTrackerCalled = true - } - - assert.False(t, requestTrackerCalled, "request tracker should NOT be called when bytes limiter rejects") - }) -} - -// TestIntegration_RequestTrackerNotCalledForRejectedRequests verifies that when the bytes -// limiter rejects a request, the request tracker is never invoked. -// This test validates Requirement 4.2. -func TestIntegration_RequestTrackerNotCalledForRejectedRequests(t *testing.T) { - tests := []struct { - name string - bytesLimit uint64 - bytesToAdd uint64 - expectBytesLimiterReject bool - expectRequestTrackerCalled bool - }{ - { - name: "bytes limiter accepts - request tracker called", - bytesLimit: 1000, - bytesToAdd: 0, - expectBytesLimiterReject: false, - expectRequestTrackerCalled: true, - }, - { - name: "bytes limiter at capacity - request tracker not called", - bytesLimit: 100, - bytesToAdd: 100, - expectBytesLimiterReject: true, - expectRequestTrackerCalled: false, - }, - { - name: "bytes limiter over capacity - request tracker not called", - bytesLimit: 100, - bytesToAdd: 200, - expectBytesLimiterReject: true, - expectRequestTrackerCalled: false, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - tracker := NewConcurrentBytesTracker(tc.bytesLimit, nil) - - // Add bytes to simulate current load - if tc.bytesToAdd > 0 { - tracker.Add(tc.bytesToAdd) - } - - // Simulate the Series method flow - requestTrackerCalled := false - - err := tracker.TryAccept(context.Background()) - - if tc.expectBytesLimiterReject { - assert.Error(t, err) - } else { - assert.NoError(t, err) - } - - // Request tracker is only called if bytes limiter accepts - if err == nil { - requestTrackerCalled = true - } - - assert.Equal(t, tc.expectRequestTrackerCalled, requestTrackerCalled) - }) - } -} - -// TestProperty_ResourceLimiterPrecedesBytesLimiter tests Property 13: Resource limiter precedes bytes limiter -// **Feature: storegateway-max-data-limit, Property 13: Resource limiter precedes bytes limiter** -// **Validates: Requirements 4.3** -func TestProperty_ResourceLimiterPrecedesBytesLimiter(t *testing.T) { - config := &quick.Config{ - MaxCount: 100, - } - - f := func(seed int64) bool { - rng := rand.New(rand.NewSource(seed)) - - // Random bytes limit (1KB to 1GB) - bytesLimit := uint64(rng.Intn(1024*1024*1024)) + 1024 - - // Random initial bytes (0 to 2x limit to test both accept and reject scenarios) - initialBytes := uint64(rng.Intn(int(bytesLimit * 2))) - - // Random resource limiter decision - resourceLimiterRejects := rng.Intn(2) == 0 - - tracker := NewConcurrentBytesTracker(bytesLimit, nil) - - // Add initial bytes - if initialBytes > 0 { - tracker.Add(initialBytes) - } - - initialBytesCount := tracker.Current() - - // Simulate the Series method flow: - // 1. Resource limiter is checked first - // 2. Only if resource limiter accepts, bytes limiter is checked - - var bytesLimiterCalled bool - - if resourceLimiterRejects { - // Resource limiter rejects - bytes limiter should NOT be called - // The bytes counter should remain unchanged - bytesLimiterCalled = false - } else { - // Resource limiter accepts - bytes limiter is called - bytesLimiterCalled = true - _ = tracker.TryAccept(context.Background()) // We don't care about the result, just that it was called - } - - // Property: When resource limiter rejects, bytes counter should not change - // (because bytes limiter is never called) - if resourceLimiterRejects { - // Bytes counter should remain at initial value - if tracker.Current() != initialBytesCount { - return false - } - // Bytes limiter should not have been called - if bytesLimiterCalled { - return false - } - } - - return true - } - - if err := quick.Check(f, config); err != nil { - t.Error(err) - } -} - -// TestProperty_RequestTrackerIntegration tests Property 11: Request tracker integration -// **Feature: storegateway-max-data-limit, Property 11: Request tracker integration** -// **Validates: Requirements 4.1** -func TestProperty_RequestTrackerIntegration(t *testing.T) { - config := &quick.Config{ - MaxCount: 100, - } - - f := func(seed int64) bool { - rng := rand.New(rand.NewSource(seed)) - - // Random bytes limit (1KB to 1GB) - bytesLimit := uint64(rng.Intn(1024*1024*1024)) + 1024 - - // Random initial bytes that will NOT exceed the limit (to ensure acceptance) - initialBytes := uint64(rng.Intn(int(bytesLimit / 2))) - - tracker := NewConcurrentBytesTracker(bytesLimit, nil) - - // Add initial bytes (below limit) - if initialBytes > 0 { - tracker.Add(initialBytes) - } - - // Simulate the Series method flow: - // 1. Resource limiter accepts (simulated) - // 2. Bytes limiter is checked - // 3. If bytes limiter accepts, request tracker is called - - resourceLimiterAccepts := true // Simulating resource limiter acceptance - var requestTrackerCalled bool - - if resourceLimiterAccepts { - err := tracker.TryAccept(context.Background()) - if err == nil { - // Bytes limiter accepted - request tracker should be called - requestTrackerCalled = true - } - } - - // Property: When bytes limiter accepts a request, request tracker should also track that request - // Since we ensured initialBytes < bytesLimit, TryAccept should succeed - // and request tracker should be called - return requestTrackerCalled - } - - if err := quick.Check(f, config); err != nil { - t.Error(err) - } -} - -// TestProperty_RejectedRequestsNotTracked tests Property 12: Rejected requests not tracked -// **Feature: storegateway-max-data-limit, Property 12: Rejected requests not tracked** -// **Validates: Requirements 4.2** -func TestProperty_RejectedRequestsNotTracked(t *testing.T) { - config := &quick.Config{ - MaxCount: 100, - } - - f := func(seed int64) bool { - rng := rand.New(rand.NewSource(seed)) - - // Random bytes limit (1KB to 1GB) - bytesLimit := uint64(rng.Intn(1024*1024*1024)) + 1024 - - // Random initial bytes that WILL exceed the limit (to ensure rejection) - // Add at least the limit amount to guarantee rejection - initialBytes := bytesLimit + uint64(rng.Intn(int(bytesLimit))) - - tracker := NewConcurrentBytesTracker(bytesLimit, nil) - - // Add initial bytes (at or above limit) - tracker.Add(initialBytes) - - // Simulate the Series method flow: - // 1. Resource limiter accepts (simulated) - // 2. Bytes limiter is checked - // 3. If bytes limiter rejects, request tracker should NOT be called - - resourceLimiterAccepts := true // Simulating resource limiter acceptance - var requestTrackerCalled bool - - if resourceLimiterAccepts { - err := tracker.TryAccept(context.Background()) - if err == nil { - // Bytes limiter accepted - request tracker would be called - requestTrackerCalled = true - } - // If err != nil, bytes limiter rejected - request tracker should NOT be called - } - - // Property: When bytes limiter rejects a request, request tracker should NOT track that request - // Since we ensured initialBytes >= bytesLimit, TryAccept should fail - // and request tracker should NOT be called - return !requestTrackerCalled - } - - if err := quick.Check(f, config); err != nil { - t.Error(err) - } -} - -// TestProperty_CancellationBeforeTracking tests Property 17: Cancellation before tracking -// **Feature: storegateway-max-data-limit, Property 17: Cancellation before tracking** -// **Validates: Requirements 8.2** -func TestProperty_CancellationBeforeTracking(t *testing.T) { - config := &quick.Config{ - MaxCount: 100, - } - - f := func(seed int64) bool { - rng := rand.New(rand.NewSource(seed)) - - // Random bytes limit (1KB to 1GB) - bytesLimit := uint64(rng.Intn(1024*1024*1024)) + 1024 - - tracker := NewConcurrentBytesTracker(bytesLimit, nil) - - // Verify initial state - initialBytes := tracker.Current() - if initialBytes != 0 { + if err := tracker.Add(limit); err != nil { return false } - - // Create a cancelled context - ctx, cancel := context.WithCancel(context.Background()) - cancel() - - // TryAccept with cancelled context should return error - err := tracker.TryAccept(ctx) - if err == nil { - return false // Should have returned an error - } - - // Verify the error is context.Canceled - if err != context.Canceled { + if tracker.Add(1) == nil { return false } - // Property: When context is cancelled before tracking, bytes counter should not be incremented - // The counter should remain at 0 - return tracker.Current() == 0 - } - - if err := quick.Check(f, config); err != nil { - t.Error(err) + tracker.Release(limit) + return tracker.Add(bytesToAdd) == nil } + require.NoError(t, quick.Check(f, &quick.Config{MaxCount: 100})) } -// TestProperty_ConcurrentDecrementsCorrectness tests Property 18: Concurrent decrements correctness -// **Feature: storegateway-max-data-limit, Property 18: Concurrent decrements correctness** -// **Validates: Requirements 8.4** func TestProperty_ConcurrentDecrementsCorrectness(t *testing.T) { - config := &quick.Config{ - MaxCount: 100, - } - f := func(seed int64) bool { rng := rand.New(rand.NewSource(seed)) - numOps := rng.Intn(100) + 10 // 10 to 109 operations + numOps := rng.Intn(100) + 10 tracker := NewConcurrentBytesTracker(uint64(100)*1024*1024*1024, nil) - // First, add bytes sequentially to establish a known state var totalBytes uint64 - releases := make([]func(), numOps) bytesPerOp := make([]uint64, numOps) - for i := range numOps { - bytes := uint64(rng.Intn(1024*1024)) + 1 // 1 byte to 1MB + bytes := uint64(rng.Intn(1024*1024)) + 1 bytesPerOp[i] = bytes totalBytes += bytes - releases[i] = tracker.Add(bytes) + _ = tracker.Add(bytes) } - // Verify all adds were counted if tracker.Current() != totalBytes { return false } - // Now release all bytes concurrently var wg sync.WaitGroup for i := range numOps { wg.Add(1) go func(idx int) { defer wg.Done() - releases[idx]() + tracker.Release(bytesPerOp[idx]) }(i) } wg.Wait() - // Property: After all concurrent releases, the counter should be exactly 0 - // All decrements should be applied correctly without losing any updates return tracker.Current() == 0 } - - if err := quick.Check(f, config); err != nil { - t.Error(err) - } + require.NoError(t, quick.Check(f, &quick.Config{MaxCount: 100})) } -// TestProperty_PanicRecoveryCleanup tests Property 19: Panic recovery cleanup -// **Feature: storegateway-max-data-limit, Property 19: Panic recovery cleanup** -// **Validates: Requirements 8.5** func TestProperty_PanicRecoveryCleanup(t *testing.T) { - config := &quick.Config{ - MaxCount: 100, - } - f := func(seed int64) bool { rng := rand.New(rand.NewSource(seed)) - - // Random bytes limit (1KB to 1GB) bytesLimit := uint64(rng.Intn(1024*1024*1024)) + 1024 - - // Random bytes to track (1 byte to 1MB) bytesToTrack := uint64(rng.Intn(1024*1024)) + 1 tracker := NewConcurrentBytesTracker(bytesLimit, nil) - // Verify initial state - if tracker.Current() != 0 { - return false - } - - // Simulate a function that panics after tracking bytes - // The deferred Release should still execute func() { - defer func() { - if r := recover(); r != nil { - // Panic was recovered - this is expected - } - }() + defer func() { recover() }() - // Track bytes and defer release - release := tracker.Add(bytesToTrack) - defer release() - - // Verify bytes are tracked - if tracker.Current() != bytesToTrack { - panic("bytes not tracked correctly") + if err := tracker.Add(bytesToTrack); err != nil { + return } + defer tracker.Release(bytesToTrack) - // Simulate panic during request processing - panic("simulated panic during request processing") + panic("simulated panic") }() - // Property: After panic recovery, the deferred Release function should have executed - // and the counter should be back to 0 return tracker.Current() == 0 } - - if err := quick.Check(f, config); err != nil { - t.Error(err) - } + require.NoError(t, quick.Check(f, &quick.Config{MaxCount: 100})) } -// TestProperty_PanicRecoveryWithRegistry tests panic recovery with the tracking limiter registry -// **Feature: storegateway-max-data-limit, Property 19: Panic recovery cleanup (registry variant)** -// **Validates: Requirements 8.5** func TestProperty_PanicRecoveryWithRegistry(t *testing.T) { - config := &quick.Config{ - MaxCount: 100, - } - f := func(seed int64) bool { rng := rand.New(rand.NewSource(seed)) - - // Random bytes limit (1KB to 1GB) bytesLimit := uint64(rng.Intn(1024*1024*1024)) + 1024 - - // Random number of limiters (1 to 10) numLimiters := rng.Intn(10) + 1 tracker := NewConcurrentBytesTracker(bytesLimit, nil) registry := newTrackingBytesLimiterRegistry() - // Verify initial state - if tracker.Current() != 0 { - return false - } - - // Simulate a function that creates multiple limiters and panics func() { - defer func() { - if r := recover(); r != nil { - // Panic was recovered - this is expected - } - }() - - // Defer registry cleanup + defer func() { recover() }() defer registry.ReleaseAll() - // Create multiple tracking limiters and reserve bytes - var totalBytes uint64 for range numLimiters { inner := newMockBytesLimiter(bytesLimit) limiter := newTrackingBytesLimiter(inner, tracker) registry.Register(limiter) - bytes := uint64(rng.Intn(1024*1024)) + 1 // 1 byte to 1MB + bytes := uint64(rng.Intn(1024*1024)) + 1 limiter.ReserveWithType(bytes, store.PostingsFetched) - totalBytes += bytes } - // Verify bytes are tracked - if tracker.Current() != totalBytes { - panic("bytes not tracked correctly") - } - - // Simulate panic during request processing - panic("simulated panic during request processing") + panic("simulated panic") }() - // Property: After panic recovery, the deferred ReleaseAll should have executed - // and the counter should be back to 0 return tracker.Current() == 0 } + require.NoError(t, quick.Check(f, &quick.Config{MaxCount: 100})) +} + +func TestPropertyAddReturnsErrPoolExhaustedIffOverLimit(t *testing.T) { + f := func(seed int64) bool { + rng := rand.New(rand.NewSource(seed)) + limit := uint64(rng.Intn(10*1024*1024)) + 1 + + tracker := NewConcurrentBytesTracker(limit, nil) + defer tracker.Stop() + + numAdds := rng.Intn(20) + 1 + var cumulativeBytes uint64 - if err := quick.Check(f, config); err != nil { - t.Error(err) + for i := 0; i < numAdds; i++ { + bytes := uint64(rng.Intn(int(limit))) + 1 + err := tracker.Add(bytes) + + if cumulativeBytes+bytes > limit { + if err != pool.ErrPoolExhausted { + return false + } + } else { + if err != nil { + return false + } + cumulativeBytes += bytes + } + } + + return tracker.Current() == cumulativeBytes + } + require.NoError(t, quick.Check(f, &quick.Config{MaxCount: 100})) +} + +func TestPropertyAddReturnsNilErrorWhenLimitingDisabled(t *testing.T) { + f := func(bytes uint64) bool { + bytes = (bytes % (1024 * 1024 * 1024)) + 1 + tracker := NewConcurrentBytesTracker(0, nil) + defer tracker.Stop() + return tracker.Add(bytes) == nil } + require.NoError(t, quick.Check(f, &quick.Config{MaxCount: 100})) } diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index a357d111431..4c38fc117b8 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -416,9 +416,6 @@ func (g *StoreGateway) Series(req *storepb.SeriesRequest, srv storegatewaypb.Sto if err := g.checkResourceUtilization(); err != nil { return err } - if err := g.concurrentBytesTracker.TryAccept(srv.Context()); err != nil { - return err - } return g.stores.Series(req, srv) } diff --git a/pkg/storegateway/tracking_bytes_limiter.go b/pkg/storegateway/tracking_bytes_limiter.go index f4a87c58917..5b03d8a2a42 100644 --- a/pkg/storegateway/tracking_bytes_limiter.go +++ b/pkg/storegateway/tracking_bytes_limiter.go @@ -29,7 +29,9 @@ func (t *trackingBytesLimiter) ReserveWithType(num uint64, dataType store.StoreD return err } - _ = t.tracker.Add(num) + if err := t.tracker.Add(num); err != nil { + return err + } _ = t.tracked.Add(num) return nil diff --git a/pkg/storegateway/tracking_bytes_limiter_test.go b/pkg/storegateway/tracking_bytes_limiter_test.go index c4be68cd459..11e4997bda1 100644 --- a/pkg/storegateway/tracking_bytes_limiter_test.go +++ b/pkg/storegateway/tracking_bytes_limiter_test.go @@ -10,7 +10,6 @@ import ( "github.com/thanos-io/thanos/pkg/store" ) -// mockBytesLimiter is a mock implementation of store.BytesLimiter for testing type mockBytesLimiter struct { reservedBytes uint64 limit uint64 @@ -35,9 +34,7 @@ func TestTrackingBytesLimiter_Basic(t *testing.T) { tracker := NewConcurrentBytesTracker(10000, nil) limiter := newTrackingBytesLimiter(inner, tracker) - err := limiter.ReserveWithType(100, store.PostingsFetched) - require.NoError(t, err) - + require.NoError(t, limiter.ReserveWithType(100, store.PostingsFetched)) assert.Equal(t, uint64(100), inner.Reserved()) }) @@ -46,9 +43,7 @@ func TestTrackingBytesLimiter_Basic(t *testing.T) { tracker := NewConcurrentBytesTracker(10000, nil) limiter := newTrackingBytesLimiter(inner, tracker) - err := limiter.ReserveWithType(100, store.PostingsFetched) - require.NoError(t, err) - + require.NoError(t, limiter.ReserveWithType(100, store.PostingsFetched)) assert.Equal(t, uint64(100), tracker.Current()) assert.Equal(t, uint64(100), limiter.TrackedBytes()) }) @@ -58,11 +53,8 @@ func TestTrackingBytesLimiter_Basic(t *testing.T) { tracker := NewConcurrentBytesTracker(10000, nil) limiter := newTrackingBytesLimiter(inner, tracker) - err := limiter.ReserveWithType(100, store.PostingsFetched) - require.NoError(t, err) - + require.NoError(t, limiter.ReserveWithType(100, store.PostingsFetched)) limiter.Release() - assert.Equal(t, uint64(0), tracker.Current()) assert.Equal(t, uint64(0), limiter.TrackedBytes()) }) @@ -72,247 +64,155 @@ func TestTrackingBytesLimiter_Basic(t *testing.T) { tracker := NewConcurrentBytesTracker(10000, nil) limiter := newTrackingBytesLimiter(inner, tracker) - err := limiter.ReserveWithType(100, store.PostingsFetched) - require.NoError(t, err) - err = limiter.ReserveWithType(200, store.SeriesFetched) - require.NoError(t, err) - err = limiter.ReserveWithType(300, store.ChunksFetched) - require.NoError(t, err) - + require.NoError(t, limiter.ReserveWithType(100, store.PostingsFetched)) + require.NoError(t, limiter.ReserveWithType(200, store.SeriesFetched)) + require.NoError(t, limiter.ReserveWithType(300, store.ChunksFetched)) assert.Equal(t, uint64(600), tracker.Current()) assert.Equal(t, uint64(600), limiter.TrackedBytes()) }) } -// TestProperty_BytesLimiterIntegration tests Property 15: BytesLimiter integration -// **Feature: storegateway-max-data-limit, Property 15: BytesLimiter integration** -// **Validates: Requirements 5.2** -func TestProperty_BytesLimiterIntegration(t *testing.T) { - config := &quick.Config{ - MaxCount: 100, - } - - f := func(seed int64) bool { - rng := rand.New(rand.NewSource(seed)) - numReserves := rng.Intn(50) + 1 // 1 to 50 reserves - - inner := newMockBytesLimiter(uint64(100) * 1024 * 1024 * 1024) // 100GB limit - tracker := NewConcurrentBytesTracker(uint64(100)*1024*1024*1024, nil) - limiter := newTrackingBytesLimiter(inner, tracker) - - var totalBytes uint64 - - // Reserve random amounts of bytes - for range numReserves { - bytes := uint64(rng.Intn(1024*1024)) + 1 // 1 byte to 1MB - err := limiter.ReserveWithType(bytes, store.StoreDataType(rng.Intn(6))) - if err != nil { - return false - } - totalBytes += bytes - } - - // Verify concurrent tracker was updated with the same total amount - if tracker.Current() != totalBytes { - return false - } - - // Verify inner limiter received the same total amount - if inner.Reserved() != totalBytes { - return false - } - - // Verify tracked bytes matches - if limiter.TrackedBytes() != totalBytes { - return false - } - - // Release and verify cleanup - limiter.Release() - return tracker.Current() == 0 && limiter.TrackedBytes() == 0 - } - - if err := quick.Check(f, config); err != nil { - t.Error(err) - } -} - -// TestProperty_AllDataTypesTracked tests Property 16: All data types tracked -// **Feature: storegateway-max-data-limit, Property 16: All data types tracked** -// **Validates: Requirements 5.3** -func TestProperty_AllDataTypesTracked(t *testing.T) { - config := &quick.Config{ - MaxCount: 100, - } - - // All StoreDataType values from Thanos - dataTypes := []store.StoreDataType{ - store.PostingsFetched, - store.PostingsTouched, - store.SeriesFetched, - store.SeriesTouched, - store.ChunksFetched, - store.ChunksTouched, - } - - f := func(seed int64) bool { - rng := rand.New(rand.NewSource(seed)) - - inner := newMockBytesLimiter(uint64(100) * 1024 * 1024 * 1024) - tracker := NewConcurrentBytesTracker(uint64(100)*1024*1024*1024, nil) - limiter := newTrackingBytesLimiter(inner, tracker) - - var totalBytes uint64 - - // Reserve bytes for each data type - for _, dataType := range dataTypes { - bytes := uint64(rng.Intn(1024*1024)) + 1 // 1 byte to 1MB - err := limiter.ReserveWithType(bytes, dataType) - if err != nil { - return false - } - totalBytes += bytes - } - - // Verify all data types were tracked consistently - if tracker.Current() != totalBytes { - return false - } - - if limiter.TrackedBytes() != totalBytes { - return false - } - - // Release and verify cleanup - limiter.Release() - return tracker.Current() == 0 - } - - if err := quick.Check(f, config); err != nil { - t.Error(err) - } -} - func TestTrackingBytesLimiter_ReleaseIdempotent(t *testing.T) { inner := newMockBytesLimiter(1000) tracker := NewConcurrentBytesTracker(10000, nil) limiter := newTrackingBytesLimiter(inner, tracker) - err := limiter.ReserveWithType(100, store.PostingsFetched) - require.NoError(t, err) - - // First release - limiter.Release() - assert.Equal(t, uint64(0), tracker.Current()) - - // Second release should be safe (idempotent) + require.NoError(t, limiter.ReserveWithType(100, store.PostingsFetched)) limiter.Release() assert.Equal(t, uint64(0), tracker.Current()) -} - -func TestTrackingBytesLimiter_WithLimitingDisabled(t *testing.T) { - inner := newMockBytesLimiter(1000) - tracker := NewConcurrentBytesTracker(0, nil) // 0 limit = limiting disabled but tracking enabled - limiter := newTrackingBytesLimiter(inner, tracker) - - err := limiter.ReserveWithType(100, store.PostingsFetched) - require.NoError(t, err) - - // Inner limiter should still work - assert.Equal(t, uint64(100), inner.Reserved()) - // Tracker should track bytes even when limiting is disabled - assert.Equal(t, uint64(100), tracker.Current()) - - // Release should work correctly limiter.Release() assert.Equal(t, uint64(0), tracker.Current()) } -func TestTrackingBytesLimiter_PanicRecovery(t *testing.T) { - t.Run("defer release cleans up on panic with recover", func(t *testing.T) { - inner := newMockBytesLimiter(1000) - tracker := NewConcurrentBytesTracker(10000, nil) - limiter := newTrackingBytesLimiter(inner, tracker) - - // Use a wrapper function to recover from panic - func() { - defer func() { - if r := recover(); r != nil { - // Panic was recovered, verify cleanup happened - assert.Equal(t, uint64(0), tracker.Current(), "bytes should be released after panic recovery") - } - }() - - defer limiter.Release() - - err := limiter.ReserveWithType(100, store.PostingsFetched) - require.NoError(t, err) - - // Verify bytes are tracked before panic - assert.Equal(t, uint64(100), tracker.Current()) - - // Simulate panic - panic("simulated panic") - }() - - // Verify cleanup happened - assert.Equal(t, uint64(0), tracker.Current()) - }) -} - func TestTrackingBytesLimiterRegistry(t *testing.T) { - t.Run("registry releases all limiters", func(t *testing.T) { + t.Run("releases all limiters", func(t *testing.T) { tracker := NewConcurrentBytesTracker(10000, nil) registry := newTrackingBytesLimiterRegistry() - // Create multiple limiters and register them limiter1 := newTrackingBytesLimiter(newMockBytesLimiter(1000), tracker) limiter2 := newTrackingBytesLimiter(newMockBytesLimiter(1000), tracker) limiter3 := newTrackingBytesLimiter(newMockBytesLimiter(1000), tracker) - registry.Register(limiter1) registry.Register(limiter2) registry.Register(limiter3) - // Reserve bytes through each limiter limiter1.ReserveWithType(100, store.PostingsFetched) limiter2.ReserveWithType(200, store.SeriesFetched) limiter3.ReserveWithType(300, store.ChunksFetched) - - // Verify total bytes tracked assert.Equal(t, uint64(600), tracker.Current()) - // Release all through registry registry.ReleaseAll() - - // Verify all bytes released assert.Equal(t, uint64(0), tracker.Current()) }) - t.Run("registry handles panic recovery", func(t *testing.T) { + t.Run("handles panic recovery", func(t *testing.T) { tracker := NewConcurrentBytesTracker(10000, nil) registry := newTrackingBytesLimiterRegistry() func() { - defer func() { - if r := recover(); r != nil { - // Panic was recovered - } - }() - + defer func() { recover() }() defer registry.ReleaseAll() limiter := newTrackingBytesLimiter(newMockBytesLimiter(1000), tracker) registry.Register(limiter) - limiter.ReserveWithType(100, store.PostingsFetched) - assert.Equal(t, uint64(100), tracker.Current()) panic("simulated panic") }() - // Verify cleanup happened assert.Equal(t, uint64(0), tracker.Current()) }) } + +func TestProperty_BytesLimiterIntegration(t *testing.T) { + f := func(seed int64) bool { + rng := rand.New(rand.NewSource(seed)) + numReserves := rng.Intn(50) + 1 + + inner := newMockBytesLimiter(uint64(100) * 1024 * 1024 * 1024) + tracker := NewConcurrentBytesTracker(uint64(100)*1024*1024*1024, nil) + limiter := newTrackingBytesLimiter(inner, tracker) + + var totalBytes uint64 + for range numReserves { + bytes := uint64(rng.Intn(1024*1024)) + 1 + if err := limiter.ReserveWithType(bytes, store.StoreDataType(rng.Intn(6))); err != nil { + return false + } + totalBytes += bytes + } + + if tracker.Current() != totalBytes || inner.Reserved() != totalBytes || limiter.TrackedBytes() != totalBytes { + return false + } + + limiter.Release() + return tracker.Current() == 0 && limiter.TrackedBytes() == 0 + } + require.NoError(t, quick.Check(f, &quick.Config{MaxCount: 100})) +} + +func TestPropertyReserveWithTypePropagatesAddError(t *testing.T) { + f := func(seed int64) bool { + rng := rand.New(rand.NewSource(seed)) + limit := uint64(rng.Intn(10*1024*1024)) + 1 + + tracker := NewConcurrentBytesTracker(limit, nil) + inner := newMockBytesLimiter(^uint64(0)) + limiter := newTrackingBytesLimiter(inner, tracker) + + numReserves := rng.Intn(20) + 1 + var trackedBytes uint64 + + for range numReserves { + bytes := uint64(rng.Intn(2*1024*1024)) + 1 + wouldExceed := trackedBytes+bytes > limit + + err := limiter.ReserveWithType(bytes, store.StoreDataType(rng.Intn(6))) + + if wouldExceed && err == nil { + return false + } + if !wouldExceed && err != nil { + return false + } + if err == nil { + trackedBytes += bytes + } + } + return true + } + require.NoError(t, quick.Check(f, &quick.Config{MaxCount: 100})) +} + +func TestPropertyReleaseWorksCorrectlyAfterAddError(t *testing.T) { + f := func(seed int64) bool { + rng := rand.New(rand.NewSource(seed)) + limit := uint64(rng.Intn(1024)) + 1 + + tracker := NewConcurrentBytesTracker(limit, nil) + inner := newMockBytesLimiter(^uint64(0)) + limiter := newTrackingBytesLimiter(inner, tracker) + + numReserves := rng.Intn(10) + 1 + sawError := false + + for range numReserves { + bytes := uint64(rng.Intn(int(limit))) + 1 + if limiter.ReserveWithType(bytes, store.StoreDataType(rng.Intn(6))) != nil { + sawError = true + } + } + + if !sawError { + if limiter.ReserveWithType(limit+1, store.PostingsFetched) == nil { + return false + } + } + + limiter.Release() + return tracker.Current() == 0 && limiter.TrackedBytes() == 0 + } + require.NoError(t, quick.Check(f, &quick.Config{MaxCount: 100})) +} From 7da50137814389ac46668f884abfc32a143fdf05 Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Thu, 19 Feb 2026 13:48:18 -0800 Subject: [PATCH 3/3] Remove unneeded gateway path Signed-off-by: Essam Eldaly --- pkg/storegateway/bucket_stores.go | 9 ++++----- pkg/storegateway/concurrent_bytes_tracker.go | 1 + pkg/storegateway/gateway.go | 6 +----- 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 6c8da2e558b..4c4e5b53da7 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -112,20 +112,19 @@ type ThanosBucketStores struct { var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many inflight requests in store gateway") // NewBucketStores makes a new BucketStores. -// Note: concurrentBytesTracker is currently only used by the TSDB bucket store implementation. -func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer, concurrentBytesTracker ConcurrentBytesTracker) (BucketStores, error) { +func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (BucketStores, error) { switch cfg.BucketStore.BucketStoreType { case string(tsdb.ParquetBucketStore): return newParquetBucketStores(cfg, bucketClient, limits, logger, reg) case string(tsdb.TSDBBucketStore): - return newThanosBucketStores(cfg, shardingStrategy, bucketClient, limits, logLevel, logger, reg, concurrentBytesTracker) + return newThanosBucketStores(cfg, shardingStrategy, bucketClient, limits, logLevel, logger, reg) default: return nil, fmt.Errorf("unsupported bucket store type: %s", cfg.BucketStore.BucketStoreType) } } // newThanosBucketStores creates a new TSDB-based bucket stores -func newThanosBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer, concurrentBytesTracker ConcurrentBytesTracker) (*ThanosBucketStores, error) { +func newThanosBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*ThanosBucketStores, error) { matchers := tsdb.NewMatchers() cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, tsdb.ParquetLabelsCacheConfig{}, matchers, bucketClient, logger, reg) if err != nil { @@ -155,7 +154,7 @@ func newThanosBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy Shardi partitioner: newGapBasedPartitioner(cfg.BucketStore.PartitionerMaxGapBytes, reg), userTokenBuckets: make(map[string]*util.TokenBucket), inflightRequests: util.NewInflightRequestTracker(), - concurrentBytesTracker: concurrentBytesTracker, + concurrentBytesTracker: NewConcurrentBytesTracker(uint64(cfg.BucketStore.MaxConcurrentBytes), reg), trackingRegistryHolder: &trackingLimiterRegistryHolder{}, syncTimes: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Name: "cortex_bucket_stores_blocks_sync_seconds", diff --git a/pkg/storegateway/concurrent_bytes_tracker.go b/pkg/storegateway/concurrent_bytes_tracker.go index 1908949ee3f..69d7ae30765 100644 --- a/pkg/storegateway/concurrent_bytes_tracker.go +++ b/pkg/storegateway/concurrent_bytes_tracker.go @@ -60,6 +60,7 @@ func NewConcurrentBytesTracker(maxConcurrentBytes uint64, reg prometheus.Registe func (t *concurrentBytesTracker) Add(bytes uint64) error { if t.maxConcurrentBytes > 0 && t.Current()+bytes > t.maxConcurrentBytes { + t.rejectedRequestsTotal.Inc() return pool.ErrPoolExhausted } diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 4c38fc117b8..14724d60b51 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -131,9 +131,6 @@ type StoreGateway struct { resourceBasedLimiter *util_limiter.ResourceBasedLimiter - // Concurrent bytes tracker for limiting bytes being processed across all queries. - concurrentBytesTracker ConcurrentBytesTracker - bucketSync *prometheus.CounterVec } @@ -241,8 +238,7 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf shardingStrategy = NewNoShardingStrategy(logger, allowedTenants) } - g.concurrentBytesTracker = NewConcurrentBytesTracker(uint64(storageCfg.BucketStore.MaxConcurrentBytes), reg) - g.stores, err = NewBucketStores(storageCfg, shardingStrategy, bucketClient, limits, logLevel, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "store-gateway"}, reg), g.concurrentBytesTracker) + g.stores, err = NewBucketStores(storageCfg, shardingStrategy, bucketClient, limits, logLevel, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "store-gateway"}, reg)) if err != nil { return nil, errors.Wrap(err, "create bucket stores") }