Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ var (
ErrInvalidTokenBucketBytesLimiterMode = errors.New("invalid token bucket bytes limiter mode")
ErrInvalidLazyExpandedPostingGroupMaxKeySeriesRatio = errors.New("lazy expanded posting group max key series ratio needs to be equal or greater than 0")
ErrInvalidBucketStoreType = errors.New("invalid bucket store type")
ErrInvalidMaxConcurrentBytes = errors.New("max concurrent bytes must be non-negative")
)

// BlocksStorageConfig holds the config information for the blocks storage.
Expand Down Expand Up @@ -281,6 +282,7 @@ type BucketStoreConfig struct {
SyncInterval time.Duration `yaml:"sync_interval"`
MaxConcurrent int `yaml:"max_concurrent"`
MaxInflightRequests int `yaml:"max_inflight_requests"`
MaxConcurrentBytes int64 `yaml:"max_concurrent_bytes"`
TenantSyncConcurrency int `yaml:"tenant_sync_concurrency"`
BlockSyncConcurrency int `yaml:"block_sync_concurrency"`
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
Expand Down Expand Up @@ -365,6 +367,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.ChunkPoolMaxBucketSizeBytes, "blocks-storage.bucket-store.chunk-pool-max-bucket-size-bytes", ChunkPoolDefaultMaxBucketSize, "Size - in bytes - of the largest chunks pool bucket.")
f.IntVar(&cfg.MaxConcurrent, "blocks-storage.bucket-store.max-concurrent", 100, "Max number of concurrent queries to execute against the long-term storage. The limit is shared across all tenants.")
f.IntVar(&cfg.MaxInflightRequests, "blocks-storage.bucket-store.max-inflight-requests", 0, "Max number of inflight queries to execute against the long-term storage. The limit is shared across all tenants. 0 to disable.")
f.Int64Var(&cfg.MaxConcurrentBytes, "blocks-storage.bucket-store.max-concurrent-bytes", 0, "Max number of bytes being processed concurrently across all queries. When the limit is reached, new requests are rejected with HTTP 503. 0 to disable.")
f.IntVar(&cfg.TenantSyncConcurrency, "blocks-storage.bucket-store.tenant-sync-concurrency", 10, "Maximum number of concurrent tenants syncing blocks.")
f.IntVar(&cfg.BlockSyncConcurrency, "blocks-storage.bucket-store.block-sync-concurrency", 20, "Maximum number of concurrent blocks syncing per tenant.")
f.IntVar(&cfg.MetaSyncConcurrency, "blocks-storage.bucket-store.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from object storage per tenant.")
Expand Down Expand Up @@ -429,6 +432,9 @@ func (cfg *BucketStoreConfig) Validate() error {
if cfg.LazyExpandedPostingGroupMaxKeySeriesRatio < 0 {
return ErrInvalidLazyExpandedPostingGroupMaxKeySeriesRatio
}
if cfg.MaxConcurrentBytes < 0 {
return ErrInvalidMaxConcurrentBytes
}
return nil
}

Expand Down
18 changes: 18 additions & 0 deletions pkg/storage/tsdb/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,24 @@ func TestConfig_Validate(t *testing.T) {
},
expectedErr: errUnSupportedWALCompressionType,
},
"should fail on negative max concurrent bytes": {
setup: func(cfg *BlocksStorageConfig) {
cfg.BucketStore.MaxConcurrentBytes = -1
},
expectedErr: ErrInvalidMaxConcurrentBytes,
},
"should pass on zero max concurrent bytes (disabled)": {
setup: func(cfg *BlocksStorageConfig) {
cfg.BucketStore.MaxConcurrentBytes = 0
},
expectedErr: nil,
},
"should pass on positive max concurrent bytes": {
setup: func(cfg *BlocksStorageConfig) {
cfg.BucketStore.MaxConcurrentBytes = 1024 * 1024 * 1024 // 1GB
},
expectedErr: nil,
},
}

for testName, testData := range tests {
Expand Down
53 changes: 35 additions & 18 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ type ThanosBucketStores struct {
// Keeps number of inflight requests
inflightRequests *util.InflightRequestTracker

// Concurrent bytes tracker for limiting bytes being processed across all queries.
concurrentBytesTracker ConcurrentBytesTracker

// Registry holder for tracking bytes limiters per request.
// This is used to ensure cleanup happens when requests complete.
trackingRegistryHolder *trackingLimiterRegistryHolder

// Metrics.
syncTimes prometheus.Histogram
syncLastSuccess prometheus.Gauge
Expand All @@ -105,19 +112,20 @@ type ThanosBucketStores struct {
var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many inflight requests in store gateway")

// NewBucketStores makes a new BucketStores.
func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (BucketStores, error) {
// Note: concurrentBytesTracker is currently only used by the TSDB bucket store implementation.
func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer, concurrentBytesTracker ConcurrentBytesTracker) (BucketStores, error) {
switch cfg.BucketStore.BucketStoreType {
case string(tsdb.ParquetBucketStore):
return newParquetBucketStores(cfg, bucketClient, limits, logger, reg)
case string(tsdb.TSDBBucketStore):
return newThanosBucketStores(cfg, shardingStrategy, bucketClient, limits, logLevel, logger, reg)
return newThanosBucketStores(cfg, shardingStrategy, bucketClient, limits, logLevel, logger, reg, concurrentBytesTracker)
default:
return nil, fmt.Errorf("unsupported bucket store type: %s", cfg.BucketStore.BucketStoreType)
}
}

// newThanosBucketStores creates a new TSDB-based bucket stores
func newThanosBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*ThanosBucketStores, error) {
func newThanosBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer, concurrentBytesTracker ConcurrentBytesTracker) (*ThanosBucketStores, error) {
matchers := tsdb.NewMatchers()
cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, tsdb.ParquetLabelsCacheConfig{}, matchers, bucketClient, logger, reg)
if err != nil {
Expand All @@ -133,20 +141,22 @@ func newThanosBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy Shardi
}).Set(float64(cfg.BucketStore.MaxConcurrent))

u := &ThanosBucketStores{
logger: logger,
cfg: cfg,
limits: limits,
bucket: cachingBucket,
shardingStrategy: shardingStrategy,
stores: map[string]*store.BucketStore{},
storesErrors: map[string]error{},
logLevel: logLevel,
bucketStoreMetrics: NewBucketStoreMetrics(),
metaFetcherMetrics: NewMetadataFetcherMetrics(),
queryGate: queryGate,
partitioner: newGapBasedPartitioner(cfg.BucketStore.PartitionerMaxGapBytes, reg),
userTokenBuckets: make(map[string]*util.TokenBucket),
inflightRequests: util.NewInflightRequestTracker(),
logger: logger,
cfg: cfg,
limits: limits,
bucket: cachingBucket,
shardingStrategy: shardingStrategy,
stores: map[string]*store.BucketStore{},
storesErrors: map[string]error{},
logLevel: logLevel,
bucketStoreMetrics: NewBucketStoreMetrics(),
metaFetcherMetrics: NewMetadataFetcherMetrics(),
queryGate: queryGate,
partitioner: newGapBasedPartitioner(cfg.BucketStore.PartitionerMaxGapBytes, reg),
userTokenBuckets: make(map[string]*util.TokenBucket),
inflightRequests: util.NewInflightRequestTracker(),
concurrentBytesTracker: concurrentBytesTracker,
trackingRegistryHolder: &trackingLimiterRegistryHolder{},
syncTimes: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_bucket_stores_blocks_sync_seconds",
Help: "The total time it takes to perform a sync stores",
Expand Down Expand Up @@ -381,6 +391,13 @@ func (u *ThanosBucketStores) Series(req *storepb.SeriesRequest, srv storepb.Stor
defer u.inflightRequests.Dec()
}

registry := newTrackingBytesLimiterRegistry()
u.trackingRegistryHolder.SetRegistry(registry)
defer func() {
u.trackingRegistryHolder.ClearRegistry()
registry.ReleaseAll()
}()

err = store.Series(req, spanSeriesServer{
Store_SeriesServer: srv,
ctx: spanCtx,
Expand Down Expand Up @@ -697,7 +714,7 @@ func (u *ThanosBucketStores) getOrCreateStore(userID string) (*store.BucketStore
u.syncDirForUser(userID),
newChunksLimiterFactory(u.limits, userID),
newSeriesLimiterFactory(u.limits, userID),
newBytesLimiterFactory(u.limits, userID, u.getUserTokenBucket(userID), u.instanceTokenBucket, u.cfg.BucketStore.TokenBucketBytesLimiter, u.getTokensToRetrieve),
newBytesLimiterFactory(u.limits, userID, u.getUserTokenBucket(userID), u.instanceTokenBucket, u.cfg.BucketStore.TokenBucketBytesLimiter, u.getTokensToRetrieve, u.concurrentBytesTracker, u.trackingRegistryHolder),
u.partitioner,
u.cfg.BucketStore.BlockSyncConcurrency,
false, // No need to enable backward compatibility with Thanos pre 0.8.0 queriers
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket_stores_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func BenchmarkThanosBucketStores_SeriesBatch(b *testing.B) {
require.NoError(b, err)

reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucketClient), defaultLimitsOverrides(nil), mockLoggingLevel(), log.NewNopLogger(), reg)
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucketClient), defaultLimitsOverrides(nil), mockLoggingLevel(), log.NewNopLogger(), reg, nil)
require.NoError(b, err)

// Perform Initial Sync to load blocks
Expand Down
28 changes: 14 additions & 14 deletions pkg/storegateway/bucket_stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestBucketStores_CustomerKeyError(t *testing.T) {
require.NoError(t, err)

reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), mBucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), mBucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil)
require.NoError(t, err)

if tc.mockInitialSync {
Expand Down Expand Up @@ -208,7 +208,7 @@ func TestBucketStores_InitialSync(t *testing.T) {
require.NoError(t, err)

reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil)
require.NoError(t, err)

// Query series before the initial sync.
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestBucketStores_InitialSyncShouldRetryOnFailure(t *testing.T) {
bucket = &failFirstGetBucket{Bucket: bucket}

reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil)
require.NoError(t, err)

// Initial sync should succeed even if a transient error occurs.
Expand Down Expand Up @@ -346,7 +346,7 @@ func TestBucketStores_SyncBlocks(t *testing.T) {
require.NoError(t, err)

reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil)
require.NoError(t, err)

// Run an initial sync to discover 1 block.
Expand Down Expand Up @@ -446,7 +446,7 @@ func TestBucketStores_syncUsersBlocks(t *testing.T) {
bucketClient.MockExists(path.Join(users.GlobalMarkersDir, "user-3", users.TenantDeletionMarkFile), false, nil)
bucketClient.MockExists(path.Join("user-3", "markers", users.TenantDeletionMarkFile), false, nil)

stores, err := NewBucketStores(cfg, testData.shardingStrategy, bucketClient, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), nil)
stores, err := NewBucketStores(cfg, testData.shardingStrategy, bucketClient, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), nil, nil)
require.NoError(t, err)

// Sync user stores and count the number of times the callback is called.
Expand Down Expand Up @@ -523,7 +523,7 @@ func testBucketStoresSeriesShouldCorrectlyQuerySeriesSpanningMultipleChunks(t *t
require.NoError(t, err)

reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil)
require.NoError(t, err)
require.NoError(t, stores.InitialSync(ctx))

Expand Down Expand Up @@ -579,7 +579,7 @@ func TestBucketStores_Series_ShouldReturnErrorIfMaxInflightRequestIsReached(t *t
bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)

stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil)
require.NoError(t, err)
require.NoError(t, stores.InitialSync(context.Background()))

Expand All @@ -602,7 +602,7 @@ func TestBucketStores_Series_ShouldNotCheckMaxInflightRequestsIfTheLimitIsDisabl
bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)

stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil)
require.NoError(t, err)
require.NoError(t, stores.InitialSync(context.Background()))

Expand Down Expand Up @@ -653,7 +653,7 @@ func TestBucketStores_SyncBlocksWithIgnoreBlocksBefore(t *testing.T) {

reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil),
objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil)
require.NoError(t, err)

// Perform initial sync
Expand Down Expand Up @@ -845,7 +845,7 @@ func TestBucketStores_deleteLocalFilesForExcludedTenants(t *testing.T) {
sharding := userShardingStrategy{}

reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
stores, err := NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil)
require.NoError(t, err)

// Perform sync.
Expand Down Expand Up @@ -944,7 +944,7 @@ func TestBucketStores_tokenBuckets(t *testing.T) {
assert.NoError(t, err)

reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
stores, err := NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil)
assert.NoError(t, err)
thanosStores := stores.(*ThanosBucketStores)
assert.NotNil(t, thanosStores.instanceTokenBucket)
Expand All @@ -966,7 +966,7 @@ func TestBucketStores_tokenBuckets(t *testing.T) {
cfg.BucketStore.TokenBucketBytesLimiter.Mode = string(cortex_tsdb.TokenBucketBytesLimiterDryRun)
sharding.users = []string{user1, user2}
reg = prometheus.NewPedanticRegistry()
stores, err = NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
stores, err = NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil)
assert.NoError(t, err)
thanosStores = stores.(*ThanosBucketStores)
assert.NotNil(t, thanosStores.instanceTokenBucket)
Expand All @@ -978,7 +978,7 @@ func TestBucketStores_tokenBuckets(t *testing.T) {
cfg.BucketStore.TokenBucketBytesLimiter.Mode = string(cortex_tsdb.TokenBucketBytesLimiterDisabled)
sharding.users = []string{user1, user2}
reg = prometheus.NewPedanticRegistry()
stores, err = NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
stores, err = NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil)
assert.NoError(t, err)

assert.NoError(t, stores.InitialSync(ctx))
Expand All @@ -1002,7 +1002,7 @@ func TestBucketStores_getTokensToRetrieve(t *testing.T) {
assert.NoError(t, err)

reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil)
assert.NoError(t, err)

thanosStores := stores.(*ThanosBucketStores)
Expand Down
Loading
Loading