diff --git a/.github/workflows/scripts/install-docker.sh b/.github/workflows/scripts/install-docker.sh index d6a215e9270..f5ac7f45e65 100755 --- a/.github/workflows/scripts/install-docker.sh +++ b/.github/workflows/scripts/install-docker.sh @@ -1,11 +1,11 @@ #!/bin/bash set -x -VER="20.10.19" +VER="29.2.1" curl -L -o /tmp/docker-$VER.tgz https://download.docker.com/linux/static/stable/x86_64/docker-$VER.tgz tar -xz -C /tmp -f /tmp/docker-$VER.tgz mkdir -vp ~/.docker/cli-plugins/ -curl --silent -L "https://github.com/docker/buildx/releases/download/v0.3.0/buildx-v0.3.0.linux-amd64" > ~/.docker/cli-plugins/docker-buildx +curl --silent -L "https://github.com/docker/buildx/releases/download/v0.31.1/buildx-v0.31.1.linux-amd64" > ~/.docker/cli-plugins/docker-buildx chmod a+x ~/.docker/cli-plugins/docker-buildx mv /tmp/docker/* /usr/bin docker run --privileged --rm tonistiigi/binfmt --install all diff --git a/.github/workflows/test-build-deploy.yml b/.github/workflows/test-build-deploy.yml index ad25c07f293..5d89e1e35c0 100644 --- a/.github/workflows/test-build-deploy.yml +++ b/.github/workflows/test-build-deploy.yml @@ -15,7 +15,7 @@ on: jobs: lint: - runs-on: ubuntu-20.04 + runs-on: ubuntu-24.04 container: image: quay.io/cortexproject/build-image:master-582c03a76 steps: @@ -44,7 +44,7 @@ jobs: run: make BUILD_IN_CONTAINER=false check-white-noise test: - runs-on: ubuntu-20.04 + runs-on: ubuntu-24.04 container: image: quay.io/cortexproject/build-image:master-582c03a76 steps: @@ -64,7 +64,7 @@ jobs: security: name: CodeQL - runs-on: ubuntu-20.04 + runs-on: ubuntu-24.04 permissions: actions: read contents: read @@ -87,7 +87,7 @@ jobs: build: - runs-on: ubuntu-20.04 + runs-on: ubuntu-24.04 container: image: quay.io/cortexproject/build-image:master-582c03a76 steps: @@ -132,7 +132,7 @@ jobs: integration: needs: build - runs-on: ubuntu-20.04 + runs-on: ubuntu-24.04 strategy: fail-fast: false matrix: @@ -144,6 +144,7 @@ jobs: - integration_querier - integration_ruler - integration_query_fuzz + - integration_overrides steps: - name: Upgrade golang uses: actions/setup-go@v2 @@ -206,7 +207,7 @@ jobs: integration-configs-db: needs: build - runs-on: ubuntu-20.04 + runs-on: ubuntu-24.04 steps: - name: Checkout Repo uses: actions/checkout@v2 @@ -228,7 +229,7 @@ jobs: deploy_website: needs: [build, test] if: (github.ref == 'refs/heads/master' || startsWith(github.ref, 'refs/tags/')) && github.repository == 'cortexproject/cortex' - runs-on: ubuntu-20.04 + runs-on: ubuntu-24.04 container: image: quay.io/cortexproject/build-image:master-582c03a76 steps: @@ -270,7 +271,7 @@ jobs: deploy: needs: [build, test, lint, integration, integration-configs-db] if: (github.ref == 'refs/heads/master' || startsWith(github.ref, 'refs/tags/')) && github.repository == 'cortexproject/cortex' - runs-on: ubuntu-20.04 + runs-on: ubuntu-24.04 container: image: quay.io/cortexproject/build-image:master-582c03a76 steps: diff --git a/docs/api/_index.md b/docs/api/_index.md index 3cbdaee76c7..e9fbfb350c6 100644 --- a/docs/api/_index.md +++ b/docs/api/_index.md @@ -63,6 +63,9 @@ For the sake of clarity, in this document we have grouped API endpoints by servi | [Delete Alertmanager configuration](#delete-alertmanager-configuration) | Alertmanager || `DELETE /api/v1/alerts` | | [Tenant delete request](#tenant-delete-request) | Purger || `POST /purger/delete_tenant` | | [Tenant delete status](#tenant-delete-status) | Purger || `GET /purger/delete_tenant_status` | +| [Get user overrides](#get-user-overrides) | Overrides || `GET /api/v1/user-overrides` | +| [Set user overrides](#set-user-overrides) | Overrides || `POST /api/v1/user-overrides` | +| [Delete user overrides](#delete-user-overrides) | Overrides || `DELETE /api/v1/user-overrides` | | [Store-gateway ring status](#store-gateway-ring-status) | Store-gateway || `GET /store-gateway/ring` | | [Compactor ring status](#compactor-ring-status) | Compactor || `GET /compactor/ring` | | [Get rule files](#get-rule-files) | Configs API (deprecated) || `GET /api/prom/configs/rules` | @@ -834,6 +837,64 @@ Returns status of tenant deletion. Output format to be defined. Experimental. _Requires [authentication](#authentication)._ +## Overrides + +The Overrides service provides an API for managing user overrides. + +### Get user overrides + +``` +GET /api/v1/user-overrides +``` + +Get the current overrides for the authenticated tenant. Returns the overrides in JSON format. + +_Requires [authentication](#authentication)._ + +### Set user overrides + +``` +POST /api/v1/user-overrides +``` + +Set or update overrides for the authenticated tenant. The request body should contain a JSON object with the override values. + +_Requires [authentication](#authentication)._ + +### Delete user overrides + +``` +DELETE /api/v1/user-overrides +``` + +Delete all overrides for the authenticated tenant. This will revert the tenant to using default values. + +_Requires [authentication](#authentication)._ + +#### Example request body for PUT + +```json +{ + "ingestion_rate": 50000, + "max_global_series_per_user": 1000000, + "ruler_max_rules_per_rule_group": 100 +} +``` + +#### Supported limits + +The following limits can be modified via the API: +- `max_global_series_per_user` +- `max_global_series_per_metric` +- `ingestion_rate` +- `ingestion_burst_size` +- `ruler_max_rules_per_rule_group` +- `ruler_max_rule_groups_per_tenant` + +#### Hard limits + +Overrides are validated against hard limits defined in the runtime configuration file. If a requested override exceeds the hard limit for the tenant, the request will be rejected with a 400 status code. + ## Store-gateway ### Store-gateway ring status diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 238b04e300e..53b20bdb001 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -35,6 +35,8 @@ Cortex is an actively developed project and we want to encourage the introductio Currently experimental features are: +- Overrides API + - Runtime configuration API for managing tenant limits - S3 Server Side Encryption (SSE) using KMS (including per-tenant KMS config overrides). - Azure blob storage. - Zone awareness based replication. diff --git a/docs/guides/overrides.md b/docs/guides/overrides.md new file mode 100644 index 00000000000..0c7d78df13e --- /dev/null +++ b/docs/guides/overrides.md @@ -0,0 +1,508 @@ +--- +title: "User Overrides API" +linkTitle: "User Overrides API" +weight: 11 +slug: overrides +--- + +The User Overrides API provides a RESTful interface for managing tenant-specific limit overrides at runtime without requiring manual edits to the runtime configuration file or service restarts. + +## Context + +Cortex is a multi-tenant system that applies resource limits to each tenant to prevent any single tenant from using too many resources. These limits can be configured globally via the `limits` section in the main configuration file, or per-tenant via the `runtime_config` file. + +Traditionally, updating per-tenant limits required: +1. Manually editing the runtime configuration file +2. Waiting for Cortex to reload the configuration (based on `reload-period`) +3. Direct access to the runtime configuration storage + +The User Overrides API simplifies this process by providing HTTP endpoints that allow authorized users or systems to: +- View current tenant overrides +- Set or update specific limit overrides +- Delete all overrides for a tenant + +## Architecture + +The overrides module runs as a service within Cortex and provides three main capabilities: + +1. **API Endpoints**: RESTful HTTP endpoints for managing overrides +2. **Validation**: Enforces allowed limits and hard limits from runtime configuration +3. **Merge Behavior**: Preserves existing overrides when updating specific limits + +## Configuration + +### Enabling the Overrides Module + +The `overrides` module must be explicitly enabled in Cortex. It is not included in the `all` target by default. + +```bash +# Run only the overrides module +cortex -target=overrides -runtime-config.file=runtime.yaml + +# Include overrides with other modules +cortex -target=overrides,query-frontend,querier -runtime-config.file=runtime.yaml +``` + +### Runtime Configuration File + +The runtime configuration file controls which limits can be modified via the API and sets upper bounds (hard limits) for tenant overrides. + +#### Basic Configuration + +```yaml +# file: runtime.yaml + +# Current tenant overrides +overrides: + tenant1: + ingestion_rate: 50000 + max_global_series_per_user: 1000000 + +# Limits that can be modified via the API +api_allowed_limits: + - ingestion_rate + - ingestion_burst_size + - max_global_series_per_user + - max_global_series_per_metric + - ruler_max_rules_per_rule_group + - ruler_max_rule_groups_per_tenant +``` + +#### Configuration with Hard Limits + +Hard limits prevent tenants from setting overrides above a specified maximum value: + +```yaml +# file: runtime.yaml + +# Current tenant overrides +overrides: + tenant1: + ingestion_rate: 50000 + max_global_series_per_user: 500000 + +# Allowed limits that can be modified via API +api_allowed_limits: + - ingestion_rate + - ingestion_burst_size + - max_global_series_per_user + - max_global_series_per_metric + - ruler_max_rules_per_rule_group + - ruler_max_rule_groups_per_tenant + +# Hard limits (maximum values) per tenant +hard_overrides: + tenant1: + ingestion_rate: 100000 + max_global_series_per_user: 2000000 + tenant2: + ingestion_rate: 200000 + max_global_series_per_user: 5000000 +``` + +### Storage Backend + +The overrides module uses the same storage backend as the runtime config. Configure it using the `runtime-config` section: + +```yaml +runtime_config: + period: 10s + file: runtime.yaml + + # For S3 backend + backend: s3 + s3: + bucket_name: cortex-runtime-config + endpoint: s3.amazonaws.com + access_key_id: ${AWS_ACCESS_KEY_ID} + secret_access_key: ${AWS_SECRET_ACCESS_KEY} + + # For GCS backend + # backend: gcs + # gcs: + # bucket_name: cortex-runtime-config + + # For filesystem backend (default) + # backend: filesystem + # filesystem: + # dir: /etc/cortex +``` + +## API Reference + +All endpoints require authentication using the `X-Scope-OrgID` header with the tenant ID. + +### Get User Overrides + +```http +GET /api/v1/user-overrides +X-Scope-OrgID: tenant1 +``` + +Returns the current overrides for the authenticated tenant in JSON format. + +**Response (200 OK):** +```json +{ + "ingestion_rate": 50000, + "max_global_series_per_user": 500000, + "ruler_max_rules_per_rule_group": 100 +} +``` + +**Response (404 Not Found):** +If the tenant has no overrides configured, an empty object is returned: +```json +{} +``` + +### Set User Overrides + +```http +POST /api/v1/user-overrides +X-Scope-OrgID: tenant1 +Content-Type: application/json + +{ + "ingestion_rate": 75000 +} +``` + +Sets or updates specific overrides for the authenticated tenant. This operation **merges** with existing overrides rather than replacing them entirely. + +**Merge Behavior Example:** + +Current state: +```json +{ + "ingestion_rate": 50000, + "max_global_series_per_user": 500000, + "ruler_max_rules_per_rule_group": 100 +} +``` + +Request: +```json +{ + "ingestion_rate": 75000 +} +``` + +Result: +```json +{ + "ingestion_rate": 75000, + "max_global_series_per_user": 500000, + "ruler_max_rules_per_rule_group": 100 +} +``` + +**Response (200 OK):** +Returns success with no body. + +**Response (400 Bad Request):** +- Invalid limit names (not in `api_allowed_limits`) +- Values exceeding hard limits +- Invalid JSON format + +### Delete User Overrides + +```http +DELETE /api/v1/user-overrides +X-Scope-OrgID: tenant1 +``` + +Removes all overrides for the authenticated tenant. The tenant will revert to using global default values. + +**Response (200 OK):** +Returns success with no body. + +## API Allowed Limits + +The `api_allowed_limits` configuration in the runtime config file controls which limits can be modified via the API. This provides an additional security layer to prevent unauthorized modification of critical limits. + +### Configuring Allowed Limits + +```yaml +api_allowed_limits: + - ingestion_rate + - ingestion_burst_size + - max_global_series_per_user + - max_global_series_per_metric + - ruler_max_rules_per_rule_group + - ruler_max_rule_groups_per_tenant +``` + +### Validation Behavior + +When a POST request is made: + +1. **Allowed limits check**: Each limit in the request is validated against `api_allowed_limits` +2. **Rejection**: If any limit is not in the allowed list, the entire request is rejected with a 400 error + +**Example - Rejected Request:** + +Runtime config: +```yaml +api_allowed_limits: + - ingestion_rate + - max_global_series_per_user +``` + +Request: +```json +{ + "ingestion_rate": 50000, + "max_series_per_query": 100000 +} +``` + +Response (400 Bad Request): +``` +the following limits cannot be modified via the overrides API: max_series_per_query +``` + +### Available Limit Names + +Limit names correspond to fields in the [`limits_config`](../configuration/config-file-reference.md#limits_config) section. Common examples include: + +- `ingestion_rate` +- `ingestion_burst_size` +- `max_global_series_per_user` +- `max_global_series_per_metric` +- `max_local_series_per_user` +- `max_local_series_per_metric` +- `max_series_per_query` +- `max_samples_per_query` +- `ruler_max_rules_per_rule_group` +- `ruler_max_rule_groups_per_tenant` +- `max_label_names_per_series` +- `max_label_name_length` +- `max_label_value_length` + +## Hard Limits + +Hard limits provide per-tenant upper bounds for override values. They prevent tenants from setting limits that exceed their allocated capacity. + +**Hard limits are inclusive** - if a hard limit is set to 100000, then values up to and including 100000 are allowed, but 100001 and above will be rejected. + +### Configuring Hard Limits + +Hard limits are specified per tenant in the `hard_overrides` section: + +```yaml +hard_overrides: + tenant1: + ingestion_rate: 100000 + max_global_series_per_user: 2000000 + tenant2: + ingestion_rate: 500000 + max_global_series_per_user: 10000000 +``` + +### Validation Behavior + +When a POST request is made: + +1. **Hard limit lookup**: System checks if the tenant has hard limits configured +2. **Value comparison**: Each requested override value is compared against its hard limit (inclusive) +3. **Rejection**: If any value exceeds its hard limit, the entire request is rejected + +**Example - Allowed Request (at hard limit):** + +Runtime config: +```yaml +hard_overrides: + tenant1: + ingestion_rate: 100000 + max_global_series_per_user: 2000000 +``` + +Request: +```json +{ + "ingestion_rate": 100000 +} +``` + +Response (200 OK): The request succeeds because 100000 equals the hard limit (inclusive). + +**Example - Rejected Request (exceeds hard limit):** + +Runtime config: +```yaml +hard_overrides: + tenant1: + ingestion_rate: 100000 + max_global_series_per_user: 2000000 +``` + +Request: +```json +{ + "ingestion_rate": 100001 +} +``` + +Response (400 Bad Request): +``` +limit ingestion_rate exceeds hard limit: 100001 > 100000 +``` + +### Hard Limits vs. Default Limits + +| Configuration | Purpose | Scope | +|--------------|---------|-------| +| Default limits (`limits_config`) | Global defaults for all tenants | All tenants | +| Overrides (`overrides`) | Per-tenant custom limits | Specific tenants | +| Hard limits (`hard_overrides`) | Maximum allowed override values (inclusive) | Specific tenants | + +**Hierarchy:** +``` +Default Limits (global) + ↓ +Tenant Overrides (per-tenant, must be ≤ hard limits) + ↓ +Hard Limits (per-tenant maximum, inclusive) +``` + +## Usage Examples + +### Example 1: Initial Override Setup + +Set initial overrides for a new tenant: + +```bash +curl -X POST http://cortex:8080/api/v1/user-overrides \ + -H "X-Scope-OrgID: tenant1" \ + -H "Content-Type: application/json" \ + -d '{ + "ingestion_rate": 50000, + "max_global_series_per_user": 1000000, + "ruler_max_rules_per_rule_group": 50 + }' +``` + +### Example 2: Update Specific Limit + +Update only the ingestion rate while preserving other overrides: + +```bash +curl -X POST http://cortex:8080/api/v1/user-overrides \ + -H "X-Scope-OrgID: tenant1" \ + -H "Content-Type: application/json" \ + -d '{ + "ingestion_rate": 75000 + }' +``` + +Result: `ingestion_rate` updated to 75000, other limits remain unchanged. + +### Example 3: View Current Overrides + +```bash +curl -X GET http://cortex:8080/api/v1/user-overrides \ + -H "X-Scope-OrgID: tenant1" +``` + +Response: +```json +{ + "ingestion_rate": 75000, + "max_global_series_per_user": 1000000, + "ruler_max_rules_per_rule_group": 50 +} +``` + +### Example 4: Remove All Overrides + +```bash +curl -X DELETE http://cortex:8080/api/v1/user-overrides \ + -H "X-Scope-OrgID: tenant1" +``` + +Result: Tenant reverts to global default limits. + +### Example 5: Handling Validation Errors + +Attempt to set a disallowed limit: + +```bash +curl -X POST http://cortex:8080/api/v1/user-overrides \ + -H "X-Scope-OrgID: tenant1" \ + -H "Content-Type: application/json" \ + -d '{ + "ingestion_rate": 50000, + "some_invalid_limit": 100 + }' +``` + +Response (400): +``` +the following limits cannot be modified via the overrides API: some_invalid_limit +``` + +Attempt to exceed hard limit: + +```bash +curl -X POST http://cortex:8080/api/v1/user-overrides \ + -H "X-Scope-OrgID: tenant1" \ + -H "Content-Type: application/json" \ + -d '{ + "ingestion_rate": 100001 + }' +``` + +Response (400): +``` +limit ingestion_rate exceeds hard limit: 100001 > 100000 +``` + +Note: If the hard limit is 100000, then 100000 itself would be allowed (hard limits are inclusive), but 100001 exceeds it. + +## Operational Considerations + +### Security + +- **Authentication**: All endpoints require valid tenant authentication via `X-Scope-OrgID` header +- **Authorization**: Tenants can only manage their own overrides +- **Allowed limits**: Use `api_allowed_limits` to restrict which limits can be modified +- **Hard limits**: Use `hard_overrides` to enforce maximum values per tenant + +### High Availability + +- The overrides module can run on multiple instances for high availability +- All instances read/write to the same runtime configuration storage backend +- Changes are eventually consistent based on `runtime-config.period` + +### Best Practices + +1. **Use hard limits**: Always configure `hard_overrides` to prevent runaway resource usage +2. **Restrict allowed limits**: Only expose limits via `api_allowed_limits` that are safe for self-service +3. **Monitor changes**: Track when overrides are modified and by whom +4. **Version control**: Keep runtime configuration in version control for audit trail +5. **Gradual rollout**: Test override changes in non-production environments first + +## Troubleshooting + +### "user not found" error when getting overrides + +This means the tenant has no overrides configured. This is normal for new tenants. An empty JSON object `{}` is returned. + +### Changes not taking effect immediately + +The runtime config is reloaded periodically based on `runtime-config.period` (default: 10s). Changes may take up to this duration to be applied. + +### "failed to validate hard limits" error + +This indicates an issue reading or parsing the runtime configuration file. Check: +- Runtime config file is accessible +- YAML syntax is valid +- Storage backend is properly configured + +## See Also + +- [Limits Configuration Reference](../configuration/config-file-reference.md#limits_config) +- [Runtime Configuration](../configuration/arguments.md#runtime-configuration-file) +- [Overrides Exporter](./overrides-exporter.md) +- [API Documentation](../api/_index.md#overrides) diff --git a/docs/proposals/user-overrides-api.md b/docs/proposals/user-overrides-api.md new file mode 100644 index 00000000000..cff4c098c99 --- /dev/null +++ b/docs/proposals/user-overrides-api.md @@ -0,0 +1,77 @@ +--- +title: "User Overrides API" +linkTitle: "User Overrides API" +weight: 1 +slug: overrides-api +--- + +- Author: Bogdan Stancu +- Date: June 2025 +- Status: Proposed + +## Overview + +This proposal outlines the design for a new API endpoint that will allow users to modify their current limits in Cortex. Currently, overrides can only be changed by administrators modifying the runtime configuration file and waiting for it to be reloaded. + +## Problem + +Currently, when users need limit adjustments, they must: +1. Manually editing the runtime configuration file +2. Coordinate with users to verify the changes +3. Potentially repeating this process multiple times to find the right balance + +This manual process is time-consuming, error-prone, and doesn't scale well with a large number of users. By offering a self-service API, users can adjust their own limits within predefined boundaries, reducing the administrative overhead and improving the user experience. + +## Proposed API Design + +### Endpoints + +#### 1. GET /api/v1/user-overrides +Returns the current overrides configuration for a specific tenant. + +Response format: +```json +{ + "ingestion_rate": 10000, + "ingestion_burst_size": 20000, + "max_global_series_per_user": 1000000, + "max_global_series_per_metric": 200000, + ... +} +``` + +#### 2. POST /api/v1/user-overrides +Updates overrides for a specific tenant. The request body should contain only the overrides that need to be updated. + +Request body: +```json +{ + "ingestion_rate": 10000, + "max_series_per_metric": 100000 +} +``` + +#### 3. DELETE /api/v1/user-overrides +Removes tenant-specific overrides, reverting to default overrides. + +### Implementation Details + +1. The API will be integrated into the cortex-overrides component to: + - Read the current runtime config from the configured storage backend + - Persist changes back to the storage backend + - The API will only work with configurations stored in block storage backends. + +2. Security: + - Rate limiting will be implemented to prevent abuse + - Changes will be validated before being applied + + +3. Error Handling: + - Invalid limit values will return 400 Bad Request + - Storage backend errors will return 500 Internal Server Error + +### Open Questions: + - How do we implement a hard-limit configuration to avoid users + setting unreasonable limits? + - What set of overrides can be configurable through this API? + Limits like `shard_size` should only be modified by the admin. \ No newline at end of file diff --git a/integration/overrides_test.go b/integration/overrides_test.go new file mode 100644 index 00000000000..5160ff93994 --- /dev/null +++ b/integration/overrides_test.go @@ -0,0 +1,416 @@ +//go:build integration_overrides + +package integration + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore/providers/s3" + "gopkg.in/yaml.v3" + + "github.com/cortexproject/cortex/integration/e2e" + e2edb "github.com/cortexproject/cortex/integration/e2e/db" + "github.com/cortexproject/cortex/integration/e2ecortex" +) + +func defaultArgsOverrides(minio *e2e.HTTPService) map[string]string { + return map[string]string{ + "-log.level": "debug", + "-target": "overrides", + "-runtime-config.file": "runtime.yaml", + "-runtime-config.backend": "s3", + "-runtime-config.s3.access-key-id": e2edb.MinioAccessKey, + "-runtime-config.s3.secret-access-key": e2edb.MinioSecretKey, + "-runtime-config.s3.bucket-name": "cortex", + "-runtime-config.s3.endpoint": minio.NetworkHTTPEndpoint(), + "-runtime-config.s3.insecure": "true", + } +} + +func uploadRuntimeConfig(t *testing.T, minio *e2e.HTTPService, runtimeConfig interface{}) { + runtimeConfigData, err := yaml.Marshal(runtimeConfig) + require.NoError(t, err) + s3Client, err := s3.NewBucketWithConfig(nil, s3.Config{ + Endpoint: minio.HTTPEndpoint(), + Insecure: true, + Bucket: "cortex", + AccessKey: e2edb.MinioAccessKey, + SecretKey: e2edb.MinioSecretKey, + }, "overrides-test") + require.NoError(t, err) + require.NoError(t, s3Client.Upload(context.Background(), "runtime.yaml", bytes.NewReader(runtimeConfigData))) +} + +func TestOverridesAPIWithRunningCortex(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + minio := e2edb.NewMinio(9000, "cortex") + require.NoError(t, s.StartAndWaitReady(minio)) + + runtimeConfig := map[string]interface{}{ + "overrides": map[string]interface{}{ + "user1": map[string]interface{}{ + "ingestion_rate": 5000, + }, + }, + "api_allowed_limits": []string{ + "ingestion_rate", + "max_global_series_per_user", + "max_global_series_per_metric", + "ingestion_burst_size", + "ruler_max_rules_per_rule_group", + "ruler_max_rule_groups_per_tenant", + }, + } + uploadRuntimeConfig(t, minio, runtimeConfig) + + cortexSvc := e2ecortex.NewSingleBinary("cortex-overrides", defaultArgsOverrides(minio), "") + require.NoError(t, s.StartAndWaitReady(cortexSvc)) + + t.Run("GET overrides for existing user", func(t *testing.T) { + req, err := http.NewRequest("GET", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", nil) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "user1") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) + + var overrides map[string]interface{} + err = json.NewDecoder(resp.Body).Decode(&overrides) + require.NoError(t, err) + + assert.Equal(t, float64(5000), overrides["ingestion_rate"]) + }) + + t.Run("GET overrides for non-existing user", func(t *testing.T) { + req, err := http.NewRequest("GET", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", nil) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "user2") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) + }) + + t.Run("POST overrides for new user", func(t *testing.T) { + newOverrides := map[string]interface{}{ + "ingestion_rate": 6000, + "ingestion_burst_size": 7000, + } + requestBody, err := json.Marshal(newOverrides) + require.NoError(t, err) + + req, err := http.NewRequest("POST", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", bytes.NewReader(requestBody)) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "user3") + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) + + req, err = http.NewRequest("GET", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", nil) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "user3") + + resp, err = http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) + + var savedOverrides map[string]interface{} + err = json.NewDecoder(resp.Body).Decode(&savedOverrides) + require.NoError(t, err) + + assert.Equal(t, float64(6000), savedOverrides["ingestion_rate"]) + assert.Equal(t, float64(7000), savedOverrides["ingestion_burst_size"]) + }) + + t.Run("POST overrides with invalid limit", func(t *testing.T) { + invalidOverrides := map[string]interface{}{ + "invalid_limit": 5000, + } + requestBody, err := json.Marshal(invalidOverrides) + require.NoError(t, err) + + req, err := http.NewRequest("POST", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", bytes.NewReader(requestBody)) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "user4") + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) + }) + + t.Run("POST overrides with invalid JSON", func(t *testing.T) { + req, err := http.NewRequest("POST", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", bytes.NewReader([]byte("invalid json"))) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "user5") + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) + }) + + t.Run("DELETE overrides", func(t *testing.T) { + req, err := http.NewRequest("DELETE", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", nil) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "user1") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) + + req, err = http.NewRequest("GET", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", nil) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "user1") + + resp, err = http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) + }) + + require.NoError(t, s.Stop(cortexSvc)) +} + +func TestOverridesAPIHardLimits(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + minio := e2edb.NewMinio(9001, "cortex") + require.NoError(t, s.StartAndWaitReady(minio)) + + // Runtime config with hard limits + runtimeConfig := map[string]interface{}{ + "overrides": map[string]interface{}{}, + "hard_overrides": map[string]interface{}{ + "user1": map[string]interface{}{ + "ingestion_rate": 10000, + "max_global_series_per_user": 50000, + }, + }, + "api_allowed_limits": []string{ + "ingestion_rate", + "max_global_series_per_user", + }, + } + uploadRuntimeConfig(t, minio, runtimeConfig) + + cortexSvc := e2ecortex.NewSingleBinary("cortex-overrides-hard-limits", defaultArgsOverrides(minio), "") + require.NoError(t, s.StartAndWaitReady(cortexSvc)) + + t.Run("POST overrides within hard limits", func(t *testing.T) { + overrides := map[string]interface{}{ + "ingestion_rate": 5000, // Within hard limit of 10000 + "max_global_series_per_user": 25000, // Within hard limit of 50000 + } + requestBody, err := json.Marshal(overrides) + require.NoError(t, err) + + req, err := http.NewRequest("POST", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", bytes.NewReader(requestBody)) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "user1") + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) + }) + + t.Run("POST overrides exceeding hard limits", func(t *testing.T) { + overrides := map[string]interface{}{ + "ingestion_rate": 15000, // Exceeds hard limit of 10000 + } + requestBody, err := json.Marshal(overrides) + require.NoError(t, err) + + req, err := http.NewRequest("POST", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", bytes.NewReader(requestBody)) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "user1") + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) + }) + + t.Run("POST overrides for user without hard limits", func(t *testing.T) { + overrides := map[string]interface{}{ + "ingestion_rate": 20000, // No hard limits for user2 + } + requestBody, err := json.Marshal(overrides) + require.NoError(t, err) + + req, err := http.NewRequest("POST", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", bytes.NewReader(requestBody)) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "user2") + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) + }) + + require.NoError(t, s.Stop(cortexSvc)) +} + +func TestOverridesAPIWithS3Error(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + minio := e2edb.NewMinio(9001, "cortex") + require.NoError(t, s.StartAndWaitReady(minio)) + + runtimeConfig := map[string]interface{}{ + "overrides": map[string]interface{}{ + "user1": map[string]interface{}{ + "ingestion_rate": 5000, + }, + }, + "api_allowed_limits": []string{ + "ingestion_rate", + "max_global_series_per_user", + "max_global_series_per_metric", + "ingestion_burst_size", + "ruler_max_rules_per_rule_group", + "ruler_max_rule_groups_per_tenant", + }, + } + uploadRuntimeConfig(t, minio, runtimeConfig) + + cortexSvc := e2ecortex.NewSingleBinary("cortex-overrides-s3-error", defaultArgsOverrides(minio), "") + require.NoError(t, s.StartAndWaitReady(cortexSvc)) + + // Delete runtime.yaml from S3 to simulate file being missing + s3Client, err := s3.NewBucketWithConfig(nil, s3.Config{ + Endpoint: minio.HTTPEndpoint(), + Insecure: true, + Bucket: "cortex", + AccessKey: e2edb.MinioAccessKey, + SecretKey: e2edb.MinioSecretKey, + }, "overrides-test-delete") + require.NoError(t, err) + require.NoError(t, s3Client.Delete(context.Background(), "runtime.yaml")) + + t.Run("GET overrides when runtime.yaml is missing", func(t *testing.T) { + req, err := http.NewRequest("GET", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", nil) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "user1") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusInternalServerError, resp.StatusCode) + }) + + t.Run("POST overrides when runtime.yaml is missing", func(t *testing.T) { + newOverrides := map[string]interface{}{ + "ingestion_rate": 6000, + } + requestBody, err := json.Marshal(newOverrides) + require.NoError(t, err) + + req, err := http.NewRequest("POST", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", bytes.NewReader(requestBody)) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "user1") + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusInternalServerError, resp.StatusCode) + }) + + t.Run("DELETE overrides when runtime.yaml is missing", func(t *testing.T) { + req, err := http.NewRequest("DELETE", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", nil) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "user1") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusInternalServerError, resp.StatusCode) + }) + + require.NoError(t, s.Stop(cortexSvc)) +} + +func TestOverridesAPITenantExtraction(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + minio := e2edb.NewMinio(9010, "cortex") + require.NoError(t, s.StartAndWaitReady(minio)) + + // Upload an empty runtime config file to S3 + runtimeConfig := map[string]interface{}{ + "overrides": map[string]interface{}{}, + } + uploadRuntimeConfig(t, minio, runtimeConfig) + + cortexSvc := e2ecortex.NewSingleBinary("cortex-overrides-tenant", defaultArgsOverrides(minio), "") + require.NoError(t, s.StartAndWaitReady(cortexSvc)) + + t.Run("no tenant header", func(t *testing.T) { + req, err := http.NewRequest("GET", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", nil) + require.NoError(t, err) + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusUnauthorized, resp.StatusCode) + }) + + t.Run("empty tenant header", func(t *testing.T) { + req, err := http.NewRequest("GET", "http://"+cortexSvc.HTTPEndpoint()+"/api/v1/user-overrides", nil) + require.NoError(t, err) + req.Header.Set("X-Scope-OrgID", "") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusUnauthorized, resp.StatusCode) + }) + + require.NoError(t, s.Stop(cortexSvc)) +} diff --git a/pkg/api/api.go b/pkg/api/api.go index 97c7c44095c..3a77dd9cf1f 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -29,6 +29,7 @@ import ( frontendv2 "github.com/cortexproject/cortex/pkg/frontend/v2" "github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb" "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/overrides" "github.com/cortexproject/cortex/pkg/purger" "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/ring" @@ -354,6 +355,14 @@ func (a *API) RegisterRulerAPI(r *ruler.API) { a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/rules/{namespace}"), http.HandlerFunc(r.DeleteNamespace), true, "DELETE") } +// RegisterOverrides registers routes associated with the Overrides API +func (a *API) RegisterOverrides(o *overrides.API) { + // Register individual overrides API routes with the main API + a.RegisterRoute("/api/v1/user-overrides", http.HandlerFunc(o.GetOverrides), true, "GET") + a.RegisterRoute("/api/v1/user-overrides", http.HandlerFunc(o.SetOverrides), true, "POST") + a.RegisterRoute("/api/v1/user-overrides", http.HandlerFunc(o.DeleteOverrides), true, "DELETE") +} + // RegisterRing registers the ring UI page associated with the distributor for writes. func (a *API) RegisterRing(r *ring.Ring) { a.indexPage.AddLink(SectionAdminEndpoints, "/ingester/ring", "Ingester Ring Status") diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 1607bb525cc..3a50e4c7da9 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -38,6 +38,7 @@ import ( frontendv1 "github.com/cortexproject/cortex/pkg/frontend/v1" "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/overrides" "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/querier/tenantfederation" "github.com/cortexproject/cortex/pkg/querier/tripperware" @@ -300,7 +301,8 @@ type Cortex struct { Server *server.Server Ring *ring.Ring TenantLimits validation.TenantLimits - Overrides *validation.Overrides + OverridesConfig *validation.Overrides + Overrides *overrides.API Distributor *distributor.Distributor Ingester *ingester.Ingester Flusher *flusher.Flusher diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index de1f15d2604..d80da6e9e30 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -34,6 +34,7 @@ import ( "github.com/cortexproject/cortex/pkg/frontend" "github.com/cortexproject/cortex/pkg/frontend/transport" "github.com/cortexproject/cortex/pkg/ingester" + "github.com/cortexproject/cortex/pkg/overrides" "github.com/cortexproject/cortex/pkg/purger" "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/querier/tenantfederation" @@ -60,6 +61,7 @@ const ( API string = "api" Ring string = "ring" RuntimeConfig string = "runtime-config" + OverridesConfig string = "overrides-config" Overrides string = "overrides" OverridesExporter string = "overrides-exporter" Server string = "server" @@ -175,13 +177,26 @@ func (t *Cortex) initRuntimeConfig() (services.Service, error) { return serv, err } -func (t *Cortex) initOverrides() (serv services.Service, err error) { - t.Overrides, err = validation.NewOverrides(t.Cfg.LimitsConfig, t.TenantLimits) +func (t *Cortex) initOverridesConfig() (serv services.Service, err error) { + t.OverridesConfig, err = validation.NewOverrides(t.Cfg.LimitsConfig, t.TenantLimits) // overrides don't have operational state, nor do they need to do anything more in starting/stopping phase, // so there is no need to return any service. return nil, err } +func (t *Cortex) initOverrides() (services.Service, error) { + + overridesAPI, err := overrides.New(t.Cfg.RuntimeConfig, util_log.Logger, prometheus.DefaultRegisterer) + if err != nil { + return nil, fmt.Errorf("failed to create overrides API: %w", err) + } + t.Overrides = overridesAPI + + t.API.RegisterOverrides(overridesAPI) + + return overridesAPI, nil +} + func (t *Cortex) initOverridesExporter() (services.Service, error) { if t.Cfg.isModuleEnabled(OverridesExporter) && t.TenantLimits == nil { // This target isn't enabled by default ("all") and requires per-tenant limits to @@ -208,7 +223,7 @@ func (t *Cortex) initDistributorService() (serv services.Service, err error) { // ruler's dependency) canJoinDistributorsRing := t.Cfg.isModuleEnabled(Distributor) || t.Cfg.isModuleEnabled(All) - t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, t.Ring, canJoinDistributorsRing, prometheus.DefaultRegisterer, util_log.Logger) + t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.OverridesConfig, t.Ring, canJoinDistributorsRing, prometheus.DefaultRegisterer, util_log.Logger) if err != nil { return } @@ -228,7 +243,7 @@ func (t *Cortex) initQueryable() (serv services.Service, err error) { querierRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "querier"}, prometheus.DefaultRegisterer) // Create a querier queryable and PromQL engine - t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, querierRegisterer, util_log.Logger) + t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.OverridesConfig, t.Distributor, t.StoreQueryables, querierRegisterer, util_log.Logger) // Register the default endpoints that are always enabled for the querier module t.API.RegisterQueryable(t.QuerierQueryable, t.Distributor) @@ -359,7 +374,7 @@ func (t *Cortex) initStoreQueryables() (services.Service, error) { var servs []services.Service //nolint:revive // I prefer this form over removing 'else', because it allows q to have smaller scope. - if q, err := initQueryableForEngine(t.Cfg, t.Overrides, prometheus.DefaultRegisterer); err != nil { + if q, err := initQueryableForEngine(t.Cfg, t.OverridesConfig, prometheus.DefaultRegisterer); err != nil { return nil, fmt.Errorf("failed to initialize querier: %v", err) } else { t.StoreQueryables = append(t.StoreQueryables, querier.UseAlwaysQueryable(q)) @@ -406,7 +421,7 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) { t.Cfg.Ingester.QueryIngestersWithin = t.Cfg.Querier.QueryIngestersWithin t.tsdbIngesterConfig() - t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Overrides, prometheus.DefaultRegisterer, util_log.Logger) + t.Ingester, err = ingester.New(t.Cfg.Ingester, t.OverridesConfig, prometheus.DefaultRegisterer, util_log.Logger) if err != nil { return } @@ -426,7 +441,7 @@ func (t *Cortex) initFlusher() (serv services.Service, err error) { t.Flusher, err = flusher.New( t.Cfg.Flusher, t.Cfg.Ingester, - t.Overrides, + t.OverridesConfig, prometheus.DefaultRegisterer, util_log.Logger, ) @@ -449,7 +464,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro queryRangeMiddlewares, cache, err := queryrange.Middlewares( t.Cfg.QueryRange, util_log.Logger, - t.Overrides, + t.OverridesConfig, queryrange.PrometheusResponseExtractor{}, prometheus.DefaultRegisterer, queryAnalyzer, @@ -461,7 +476,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro return nil, err } - instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, queryAnalyzer, t.Cfg.Querier.LookbackDelta) + instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.OverridesConfig, queryAnalyzer, t.Cfg.Querier.LookbackDelta) if err != nil { return nil, err } @@ -473,7 +488,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro instantQueryMiddlewares, prometheusCodec, instantquery.InstantQueryCodec, - t.Overrides, + t.OverridesConfig, queryAnalyzer, t.Cfg.Querier.DefaultEvaluationInterval, t.Cfg.Querier.MaxSubQuerySteps, @@ -491,7 +506,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro func (t *Cortex) initQueryFrontend() (serv services.Service, err error) { retry := transport.NewRetry(t.Cfg.QueryRange.MaxRetries, prometheus.DefaultRegisterer) - roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend(t.Cfg.Frontend, t.Overrides, t.Cfg.Server.GRPCListenPort, util_log.Logger, prometheus.DefaultRegisterer, retry) + roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend(t.Cfg.Frontend, t.OverridesConfig, t.Cfg.Server.GRPCListenPort, util_log.Logger, prometheus.DefaultRegisterer, retry) if err != nil { return nil, err } @@ -526,7 +541,7 @@ func (t *Cortex) initRulerStorage() (serv services.Service, err error) { return } - t.RulerStorage, err = ruler.NewRuleStore(context.Background(), t.Cfg.RulerStorage, t.Overrides, rules.FileLoader{}, util_log.Logger, prometheus.DefaultRegisterer) + t.RulerStorage, err = ruler.NewRuleStore(context.Background(), t.Cfg.RulerStorage, t.OverridesConfig, rules.FileLoader{}, util_log.Logger, prometheus.DefaultRegisterer) return } @@ -578,14 +593,14 @@ func (t *Cortex) initRuler() (serv services.Service, err error) { queryEngine = promql.NewEngine(opts) } - managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, queryEngine, t.Overrides, metrics, prometheus.DefaultRegisterer) + managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, queryEngine, t.OverridesConfig, metrics, prometheus.DefaultRegisterer) manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger) } else { rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer) // TODO: Consider wrapping logger to differentiate from querier module logger - queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger) + queryable, _, engine := querier.New(t.Cfg.Querier, t.OverridesConfig, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger) - managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, metrics, prometheus.DefaultRegisterer) + managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.OverridesConfig, metrics, prometheus.DefaultRegisterer) manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger) } @@ -599,7 +614,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) { prometheus.DefaultRegisterer, util_log.Logger, t.RulerStorage, - t.Overrides, + t.OverridesConfig, ) if err != nil { return @@ -634,12 +649,12 @@ func (t *Cortex) initAlertManager() (serv services.Service, err error) { t.Cfg.Alertmanager.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort // Initialise the store. - store, err := alertstore.NewAlertStore(context.Background(), t.Cfg.AlertmanagerStorage, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer) + store, err := alertstore.NewAlertStore(context.Background(), t.Cfg.AlertmanagerStorage, t.OverridesConfig, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { return } - t.Alertmanager, err = alertmanager.NewMultitenantAlertmanager(&t.Cfg.Alertmanager, store, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer) + t.Alertmanager, err = alertmanager.NewMultitenantAlertmanager(&t.Cfg.Alertmanager, store, t.OverridesConfig, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { return } @@ -651,7 +666,7 @@ func (t *Cortex) initAlertManager() (serv services.Service, err error) { func (t *Cortex) initCompactor() (serv services.Service, err error) { t.Cfg.Compactor.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort - t.Compactor, err = compactor.NewCompactor(t.Cfg.Compactor, t.Cfg.BlocksStorage, util_log.Logger, prometheus.DefaultRegisterer, t.Overrides) + t.Compactor, err = compactor.NewCompactor(t.Cfg.Compactor, t.Cfg.BlocksStorage, util_log.Logger, prometheus.DefaultRegisterer, t.OverridesConfig) if err != nil { return } @@ -664,7 +679,7 @@ func (t *Cortex) initCompactor() (serv services.Service, err error) { func (t *Cortex) initStoreGateway() (serv services.Service, err error) { t.Cfg.StoreGateway.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort - t.StoreGateway, err = storegateway.NewStoreGateway(t.Cfg.StoreGateway, t.Cfg.BlocksStorage, t.Overrides, t.Cfg.Server.LogLevel, util_log.Logger, prometheus.DefaultRegisterer) + t.StoreGateway, err = storegateway.NewStoreGateway(t.Cfg.StoreGateway, t.Cfg.BlocksStorage, t.OverridesConfig, t.Cfg.Server.LogLevel, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { return nil, err } @@ -705,7 +720,7 @@ func (t *Cortex) initMemberlistKV() (services.Service, error) { func (t *Cortex) initTenantDeletionAPI() (services.Service, error) { // t.RulerStorage can be nil when running in single-binary mode, and rule storage is not configured. - tenantDeletionAPI, err := purger.NewTenantDeletionAPI(t.Cfg.BlocksStorage, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer) + tenantDeletionAPI, err := purger.NewTenantDeletionAPI(t.Cfg.BlocksStorage, t.OverridesConfig, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { return nil, err } @@ -715,7 +730,7 @@ func (t *Cortex) initTenantDeletionAPI() (services.Service, error) { } func (t *Cortex) initQueryScheduler() (services.Service, error) { - s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer) + s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.OverridesConfig, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { return nil, errors.Wrap(err, "query-scheduler init") } @@ -734,7 +749,8 @@ func (t *Cortex) setupModuleManager() error { mm.RegisterModule(RuntimeConfig, t.initRuntimeConfig, modules.UserInvisibleModule) mm.RegisterModule(MemberlistKV, t.initMemberlistKV, modules.UserInvisibleModule) mm.RegisterModule(Ring, t.initRing, modules.UserInvisibleModule) - mm.RegisterModule(Overrides, t.initOverrides, modules.UserInvisibleModule) + mm.RegisterModule(OverridesConfig, t.initOverridesConfig, modules.UserInvisibleModule) + mm.RegisterModule(Overrides, t.initOverrides) mm.RegisterModule(OverridesExporter, t.initOverridesExporter) mm.RegisterModule(Distributor, t.initDistributor) mm.RegisterModule(DistributorService, t.initDistributorService, modules.UserInvisibleModule) @@ -764,32 +780,33 @@ func (t *Cortex) setupModuleManager() error { MemberlistKV: {API}, RuntimeConfig: {API}, Ring: {API, RuntimeConfig, MemberlistKV}, - Overrides: {RuntimeConfig}, + OverridesConfig: {RuntimeConfig}, + Overrides: {API, OverridesConfig}, OverridesExporter: {RuntimeConfig}, Distributor: {DistributorService, API}, - DistributorService: {Ring, Overrides}, - Ingester: {IngesterService, Overrides, API}, - IngesterService: {Overrides, RuntimeConfig, MemberlistKV}, - Flusher: {Overrides, API}, - Queryable: {Overrides, DistributorService, Overrides, Ring, API, StoreQueryable, MemberlistKV}, + DistributorService: {Ring, OverridesConfig}, + Ingester: {IngesterService, OverridesConfig, API}, + IngesterService: {OverridesConfig, RuntimeConfig, MemberlistKV}, + Flusher: {OverridesConfig, API}, + Queryable: {OverridesConfig, DistributorService, OverridesConfig, Ring, API, StoreQueryable, MemberlistKV}, Querier: {TenantFederation}, - StoreQueryable: {Overrides, Overrides, MemberlistKV}, - QueryFrontendTripperware: {API, Overrides}, + StoreQueryable: {OverridesConfig, OverridesConfig, MemberlistKV}, + QueryFrontendTripperware: {API, OverridesConfig}, QueryFrontend: {QueryFrontendTripperware}, - QueryScheduler: {API, Overrides}, - Ruler: {DistributorService, Overrides, StoreQueryable, RulerStorage}, - RulerStorage: {Overrides}, + QueryScheduler: {API, OverridesConfig}, + Ruler: {DistributorService, OverridesConfig, StoreQueryable, RulerStorage}, + RulerStorage: {OverridesConfig}, Configs: {API}, - AlertManager: {API, MemberlistKV, Overrides}, - Compactor: {API, MemberlistKV, Overrides}, - StoreGateway: {API, Overrides, MemberlistKV}, - TenantDeletion: {API, Overrides}, + AlertManager: {API, MemberlistKV, OverridesConfig}, + Compactor: {API, MemberlistKV, OverridesConfig}, + StoreGateway: {API, OverridesConfig, MemberlistKV}, + TenantDeletion: {API, OverridesConfig}, Purger: {TenantDeletion}, TenantFederation: {Queryable}, All: {QueryFrontend, Querier, Ingester, Distributor, Purger, StoreGateway, Ruler}, } if t.Cfg.ExternalPusher != nil && t.Cfg.ExternalQueryable != nil { - deps[Ruler] = []string{Overrides, RulerStorage} + deps[Ruler] = []string{OverridesConfig, RulerStorage} } for mod, targets := range deps { if err := mm.AddDependency(mod, targets...); err != nil { diff --git a/pkg/cortex/runtime_config.go b/pkg/cortex/runtime_config.go index 3d612c7cc88..2870e140e6d 100644 --- a/pkg/cortex/runtime_config.go +++ b/pkg/cortex/runtime_config.go @@ -18,18 +18,8 @@ var ( errMultipleDocuments = errors.New("the provided runtime configuration contains multiple documents") ) -// RuntimeConfigValues are values that can be reloaded from configuration file while Cortex is running. -// Reloading is done by runtime_config.Manager, which also keeps the currently loaded config. -// These values are then pushed to the components that are interested in them. -type RuntimeConfigValues struct { - TenantLimits map[string]*validation.Limits `yaml:"overrides"` - - Multi kv.MultiRuntimeConfig `yaml:"multi_kv_config"` - - IngesterChunkStreaming *bool `yaml:"ingester_stream_chunks_when_using_blocks"` - - IngesterLimits *ingester.InstanceLimits `yaml:"ingester_limits"` -} +// avoid circular imports +type RuntimeConfigValues = runtimeconfig.RuntimeConfigValues // runtimeConfigTenantLimits provides per-tenant limit overrides based on a runtimeconfig.Manager // that reads limits from a configuration file on disk and periodically reloads them. diff --git a/pkg/overrides/api.go b/pkg/overrides/api.go new file mode 100644 index 00000000000..806cb9f67ee --- /dev/null +++ b/pkg/overrides/api.go @@ -0,0 +1,288 @@ +package overrides + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "maps" + "net/http" + + "github.com/go-kit/log/level" + "gopkg.in/yaml.v3" + + "github.com/cortexproject/cortex/pkg/tenant" + "github.com/cortexproject/cortex/pkg/util/runtimeconfig" + "github.com/cortexproject/cortex/pkg/util/validation" +) + +const ( + // Error messages + ErrInvalidJSON = "invalid JSON" + ErrUserNotFound = "user not found" + + // Runtime config errors + ErrRuntimeConfig = "runtime config read error" +) + +// getAllowedLimitsFromBucket reads allowed limits from the runtime config file +func (a *API) getAllowedLimitsFromBucket(ctx context.Context) ([]string, error) { + reader, err := a.bucketClient.Get(ctx, a.runtimeConfigPath) + defer func() { + if reader != nil { + reader.Close() + } + }() + if err != nil { + return nil, err + } + + var config runtimeconfig.RuntimeConfigValues + if err := yaml.NewDecoder(reader).Decode(&config); err != nil { + level.Error(a.logger).Log("msg", "failed to decode runtime config", "err", err) + return nil, fmt.Errorf("failed to decode runtime config") + } + + return config.APIAllowedLimits, nil +} + +// GetOverrides retrieves overrides for a specific tenant +func (a *API) GetOverrides(w http.ResponseWriter, r *http.Request) { + userID, _, err := tenant.ExtractTenantIDFromHTTPRequest(r) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + // Read overrides from bucket storage + overrides, err := a.getOverridesFromBucket(r.Context(), userID) + if err != nil { + if err.Error() == ErrUserNotFound { + level.Info(a.logger).Log("msg", "User not found", "user", userID) + http.Error(w, "user not found", http.StatusBadRequest) + } else { + level.Error(a.logger).Log("msg", "failed to get overrides from bucket", "userID", userID, "err", err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + } + return + } + if len(overrides) == 0 { + http.Error(w, "not found", http.StatusNotFound) + return + } + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(overrides); err != nil { + level.Error(a.logger).Log("msg", "failed to encode overrides response", "err", err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } +} + +// SetOverrides updates overrides for a specific tenant +func (a *API) SetOverrides(w http.ResponseWriter, r *http.Request) { + userID, _, err := tenant.ExtractTenantIDFromHTTPRequest(r) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var overrides map[string]any + if err := json.NewDecoder(r.Body).Decode(&overrides); err != nil { + http.Error(w, ErrInvalidJSON, http.StatusBadRequest) + return + } + + // Get allowed limits from runtime config + allowedLimits, err := a.getAllowedLimitsFromBucket(r.Context()) + if err != nil { + level.Error(a.logger).Log("msg", "failed to get allowed limits from bucket", "userID", userID, "err", err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + // Validate that only allowed limits are being changed + if err := ValidateOverrides(overrides, allowedLimits); err != nil { + level.Error(a.logger).Log("msg", "invalid overrides validation", "userID", userID, "err", err) + http.Error(w, "Invalid overrides", http.StatusBadRequest) + return + } + + // Validate that values don't exceed hard limits from runtime config + if err := a.validateHardLimits(overrides, userID); err != nil { + level.Error(a.logger).Log("msg", "hard limits validation failed", "userID", userID, "err", err) + http.Error(w, "Invalid overrides", http.StatusBadRequest) + return + } + + // Write overrides to bucket storage + if err := a.setOverridesToBucket(r.Context(), userID, overrides); err != nil { + level.Error(a.logger).Log("msg", "failed to set overrides to bucket", "userID", userID, "err", err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} + +// DeleteOverrides removes tenant-specific overrides +func (a *API) DeleteOverrides(w http.ResponseWriter, r *http.Request) { + userID, _, err := tenant.ExtractTenantIDFromHTTPRequest(r) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if err := a.deleteOverridesFromBucket(r.Context(), userID); err != nil { + level.Error(a.logger).Log("msg", "failed to delete overrides from bucket", "userID", userID, "err", err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} + +// getOverridesFromBucket reads overrides for a specific tenant from the runtime config file +func (a *API) getOverridesFromBucket(ctx context.Context, userID string) (map[string]any, error) { + reader, err := a.bucketClient.Get(ctx, a.runtimeConfigPath) + defer func() { + if reader != nil { + reader.Close() + } + }() + if err != nil { + return nil, fmt.Errorf("failed to get runtime config: %w", err) + } + + var config runtimeconfig.RuntimeConfigValues + if err := yaml.NewDecoder(reader).Decode(&config); err != nil { + return nil, fmt.Errorf("%s: %w", ErrRuntimeConfig, err) + } + + if config.TenantLimits != nil { + if tenantLimits, exists := config.TenantLimits[userID]; exists { + // Use YAML marshaling to convert validation.Limits to map[string]interface{} + // This follows the same pattern as the existing runtime config handler + yamlData, err := yaml.Marshal(tenantLimits) + if err != nil { + return nil, fmt.Errorf("failed to marshal limits: %w", err) + } + + var result map[string]any + if err := yaml.Unmarshal(yamlData, &result); err != nil { + return nil, fmt.Errorf("failed to unmarshal limits: %w", err) + } + + return result, nil + } + // User does not exist in config - return error + return nil, errors.New(ErrUserNotFound) + } + + // No tenant limits configured - return empty map (no overrides) + return map[string]any{}, nil +} + +// mergeLimits merges new overrides into existing limits +func mergeLimits(existing map[string]any, overrides map[string]any) map[string]any { + if existing == nil { + return overrides + } + + merged := make(map[string]any) + + // Copy existing limits + maps.Copy(merged, existing) + + // Override with new values + maps.Copy(merged, overrides) + + return merged +} + +// setOverridesToBucket writes overrides for a specific tenant to the runtime config file +func (a *API) setOverridesToBucket(ctx context.Context, userID string, overrides map[string]any) error { + var config runtimeconfig.RuntimeConfigValues + reader, err := a.bucketClient.Get(ctx, a.runtimeConfigPath) + if err != nil { + return fmt.Errorf("failed to get runtime config: %w", err) + } + defer func() { + if reader != nil { + reader.Close() + } + }() + if err := yaml.NewDecoder(reader).Decode(&config); err != nil { + return fmt.Errorf("%s: %w", ErrRuntimeConfig, err) + } + + if config.TenantLimits == nil { + config.TenantLimits = make(map[string]*validation.Limits) + } + + // Get existing limits for the user + var existingLimitsMap map[string]any + if existingLimits, exists := config.TenantLimits[userID]; exists && existingLimits != nil { + // Convert existing limits to map + yamlData, err := yaml.Marshal(existingLimits) + if err != nil { + return fmt.Errorf("failed to marshal existing limits: %w", err) + } + + if err := yaml.Unmarshal(yamlData, &existingLimitsMap); err != nil { + return fmt.Errorf("failed to unmarshal existing limits: %w", err) + } + } + + // Merge existing limits with new overrides + mergedLimits := mergeLimits(existingLimitsMap, overrides) + + // Convert merged limits back to validation.Limits + yamlData, err := yaml.Marshal(mergedLimits) + if err != nil { + return fmt.Errorf("failed to marshal overrides: %w", err) + } + + var limits validation.Limits + if err := yaml.Unmarshal(yamlData, &limits); err != nil { + return fmt.Errorf("invalid overrides format: %w", err) + } + + config.TenantLimits[userID] = &limits + + data, err := yaml.Marshal(config) + if err != nil { + return fmt.Errorf("%s: %w", ErrRuntimeConfig, err) + } + + return a.bucketClient.Upload(ctx, a.runtimeConfigPath, bytes.NewReader(data)) +} + +// deleteOverridesFromBucket removes overrides for a specific tenant from the runtime config file +func (a *API) deleteOverridesFromBucket(ctx context.Context, userID string) error { + reader, err := a.bucketClient.Get(ctx, a.runtimeConfigPath) + defer func() { + if reader != nil { + reader.Close() + } + }() + if err != nil { + return fmt.Errorf("failed to get runtime config: %w", err) + } + + var config runtimeconfig.RuntimeConfigValues + if err := yaml.NewDecoder(reader).Decode(&config); err != nil { + return fmt.Errorf("%s: %w", ErrRuntimeConfig, err) + } + + if config.TenantLimits != nil { + delete(config.TenantLimits, userID) + } + + data, err := yaml.Marshal(config) + if err != nil { + return fmt.Errorf("%s: %w", ErrRuntimeConfig, err) + } + + return a.bucketClient.Upload(ctx, a.runtimeConfigPath, bytes.NewReader(data)) +} diff --git a/pkg/overrides/limits.go b/pkg/overrides/limits.go new file mode 100644 index 00000000000..40b29a867e9 --- /dev/null +++ b/pkg/overrides/limits.go @@ -0,0 +1,131 @@ +package overrides + +import ( + "context" + "fmt" + "slices" + "strconv" + "strings" + + "github.com/go-kit/log/level" + "gopkg.in/yaml.v3" + + "github.com/cortexproject/cortex/pkg/util/runtimeconfig" +) + +const ( + // Error messages + ErrInvalidLimits = "the following limits cannot be modified via the overrides API" +) + +// No default allowed limits - these must be configured via runtime config + +// ValidateOverrides checks if the provided overrides only contain allowed limits +func ValidateOverrides(overrides map[string]any, allowedLimits []string) error { + var invalidLimits []string + + for limitName := range overrides { + if !slices.Contains(allowedLimits, limitName) { + invalidLimits = append(invalidLimits, limitName) + } + } + + if len(invalidLimits) > 0 { + return fmt.Errorf("%s: %s", ErrInvalidLimits, strings.Join(invalidLimits, ", ")) + } + + return nil +} + +// validateHardLimits checks if the provided overrides exceed any hard limits from the runtime config +func (a *API) validateHardLimits(overrides map[string]any, userID string) error { + // Read the runtime config to get hard limits + reader, err := a.bucketClient.Get(context.Background(), a.runtimeConfigPath) + defer func() { + if reader != nil { + reader.Close() + } + }() + if err != nil { + level.Error(a.logger).Log("msg", "failed to read hard limits configuration", "userID", userID, "err", err) + return fmt.Errorf("failed to validate hard limits") + } + + var config runtimeconfig.RuntimeConfigValues + if err := yaml.NewDecoder(reader).Decode(&config); err != nil { + level.Error(a.logger).Log("msg", "failed to decode hard limits configuration", "userID", userID, "err", err) + return fmt.Errorf("failed to validate hard limits") + } + + // If no hard overrides are defined, allow the request + if config.HardTenantLimits == nil { + return nil + } + + // Get hard limits for this specific user + userHardLimits, exists := config.HardTenantLimits[userID] + if !exists { + return nil // No hard limits defined for this user + } + + yamlData, err := yaml.Marshal(userHardLimits) + if err != nil { + level.Error(a.logger).Log("msg", "failed to marshal hard limits", "userID", userID, "err", err) + return fmt.Errorf("failed to validate hard limits") + } + + var hardLimitsMap map[string]any + if err := yaml.Unmarshal(yamlData, &hardLimitsMap); err != nil { + level.Error(a.logger).Log("msg", "failed to unmarshal hard limits", "userID", userID, "err", err) + return fmt.Errorf("failed to validate hard limits") + } + + // Validate each override against the user's hard limits + for limitName, value := range overrides { + if hardLimit, exists := hardLimitsMap[limitName]; exists { + if err := a.validateSingleHardLimit(limitName, value, hardLimit); err != nil { + return err + } + } + } + + return nil +} + +// validateSingleHardLimit validates a single limit against its hard limit +func (a *API) validateSingleHardLimit(limitName string, value, hardLimit any) error { + // Convert both values to float64 for comparison + valueFloat, err := convertToFloat64(value) + if err != nil { + return nil // Skip validation for unparseable values + } + + hardLimitFloat, err := convertToFloat64(hardLimit) + if err != nil { + return nil // Skip validation for unparseable hard limits + } + + // Hard limit is inclusive - values equal to the hard limit are allowed + // For example, if hard limit is 100000, then 100000 is allowed but 100001 is not + if valueFloat > hardLimitFloat { + return fmt.Errorf("limit %s exceeds hard limit: %f > %f", limitName, valueFloat, hardLimitFloat) + } + + return nil +} + +// convertToFloat64 converts any value to float64 +func convertToFloat64(v any) (float64, error) { + switch val := v.(type) { + case float64: + return val, nil + case int: + return float64(val), nil + case int64: + return float64(val), nil + case string: + return strconv.ParseFloat(val, 64) + default: + return 0, fmt.Errorf("unsupported type: %T", v) + } +} diff --git a/pkg/overrides/overrides.go b/pkg/overrides/overrides.go new file mode 100644 index 00000000000..566af42d20e --- /dev/null +++ b/pkg/overrides/overrides.go @@ -0,0 +1,94 @@ +package overrides + +import ( + "context" + "fmt" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/objstore" + + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/util/runtimeconfig" + "github.com/cortexproject/cortex/pkg/util/services" +) + +const ( + ErrInvalidOverridesConfiguration = "invalid overrides configuration" + ErrFailedToCreateBucketClient = "failed to create bucket client for overrides" +) + +type API struct { + services.Service + cfg runtimeconfig.Config + logger log.Logger + registerer prometheus.Registerer + bucketClient objstore.Bucket + runtimeConfigPath string +} + +func New(cfg runtimeconfig.Config, logger log.Logger, registerer prometheus.Registerer) (*API, error) { + if err := cfg.StorageConfig.Validate(); err != nil { + return nil, fmt.Errorf("%s: %w", ErrInvalidOverridesConfiguration, err) + } + + api := &API{ + cfg: cfg, + logger: logger, + registerer: registerer, + } + api.Service = services.NewBasicService(api.starting, api.running, api.stopping) + return api, nil +} + +func (a *API) starting(ctx context.Context) error { + level.Info(a.logger).Log( + "msg", "overrides API starting", + "runtime_config_file", a.cfg.LoadPath, + "backend", a.cfg.StorageConfig.Backend, + "s3_endpoint", a.cfg.StorageConfig.S3.Endpoint, + "response_header_timeout", a.cfg.StorageConfig.S3.HTTP.ResponseHeaderTimeout, + "idle_conn_timeout", a.cfg.StorageConfig.S3.HTTP.IdleConnTimeout, + "tls_handshake_timeout", a.cfg.StorageConfig.S3.HTTP.TLSHandshakeTimeout, + ) + + bucketClient, err := bucket.NewClient(ctx, a.cfg.StorageConfig, "overrides", a.logger, a.registerer) + if err != nil { + level.Error(a.logger).Log("msg", ErrFailedToCreateBucketClient, "err", err) + return fmt.Errorf("%s: %w", ErrFailedToCreateBucketClient, err) + } + a.bucketClient = bucketClient + + a.runtimeConfigPath = a.cfg.LoadPath + + level.Info(a.logger).Log("msg", "overrides API started successfully", "backend", a.cfg.StorageConfig.Backend) + return nil +} + +func (a *API) running(ctx context.Context) error { + level.Info(a.logger).Log("msg", "overrides API is now running and ready to handle requests") + + <-ctx.Done() + + level.Info(a.logger).Log("msg", "overrides API received shutdown signal") + return nil +} + +func (a *API) stopping(err error) error { + if err != nil { + level.Error(a.logger).Log("msg", "overrides API stopping due to error", "err", err) + } else { + level.Info(a.logger).Log("msg", "overrides API stopping gracefully") + } + + // Close bucket client to release resources + if a.bucketClient != nil { + if closeErr := a.bucketClient.Close(); closeErr != nil { + level.Warn(a.logger).Log("msg", "failed to close bucket client", "err", closeErr) + } + } + + level.Info(a.logger).Log("msg", "overrides API stopped") + return nil +} diff --git a/pkg/overrides/overrides_test.go b/pkg/overrides/overrides_test.go new file mode 100644 index 00000000000..3cc505855c6 --- /dev/null +++ b/pkg/overrides/overrides_test.go @@ -0,0 +1,654 @@ +package overrides + +import ( + "bytes" + "encoding/json" + "flag" + "fmt" + "io" + "net/http" + "net/http/httptest" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v3" + + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/storage/bucket/s3" + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/log" + "github.com/cortexproject/cortex/pkg/util/runtimeconfig" + "github.com/cortexproject/cortex/pkg/util/services" +) + +func TestConfig_Validate(t *testing.T) { + t.Parallel() + tests := map[string]struct { + initConfig func(*runtimeconfig.Config) + expected error + }{ + "default config should pass": { + initConfig: func(cfg *runtimeconfig.Config) { + // Set default values for bucket config + flagext.DefaultValues(&cfg.StorageConfig) + }, + expected: nil, + }, + "s3 config should pass": { + initConfig: func(cfg *runtimeconfig.Config) { + cfg.StorageConfig = bucket.Config{ + Backend: bucket.S3, + S3: s3.Config{ + AccessKeyID: "test-access-key", + SecretAccessKey: flagext.Secret{Value: "test-secret-key"}, + BucketName: "test-bucket", + Endpoint: "localhost:9000", + Insecure: true, + }, + } + // Set default values before validation + flagext.DefaultValues(&cfg.StorageConfig.S3) + }, + expected: nil, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + t.Parallel() + cfg := runtimeconfig.Config{} + + testData.initConfig(&cfg) + + if testData.expected == nil { + assert.NoError(t, cfg.StorageConfig.Validate()) + } else { + assert.ErrorIs(t, cfg.StorageConfig.Validate(), testData.expected) + } + }) + } +} + +func TestConfig_RegisterFlags(t *testing.T) { + cfg := runtimeconfig.Config{} + + // Test that flags are registered without panicking + require.NotPanics(t, func() { + flagSet := flag.NewFlagSet("test", flag.PanicOnError) + cfg.RegisterFlags(flagSet) + }) +} + +func TestNew(t *testing.T) { + tests := map[string]struct { + cfg runtimeconfig.Config + expectError bool + }{ + "valid config should create API": { + cfg: func() runtimeconfig.Config { + cfg := runtimeconfig.Config{ + StorageConfig: bucket.Config{ + Backend: bucket.S3, + S3: s3.Config{ + AccessKeyID: "test-access-key", + SecretAccessKey: flagext.Secret{Value: "test-secret-key"}, + BucketName: "test-bucket", + Endpoint: "localhost:9000", + Insecure: true, + }, + }, + } + // Set default values before validation + flagext.DefaultValues(&cfg.StorageConfig.S3) + return cfg + }(), + expectError: false, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + api, err := New(testData.cfg, log.Logger, prometheus.DefaultRegisterer) + + if testData.expectError { + assert.Error(t, err) + assert.Nil(t, api) + } else { + assert.NoError(t, err) + assert.NotNil(t, api) + } + }) + } +} + +func TestOverridesModuleServiceInterface(t *testing.T) { + // Create the API instance with proper configuration + cfg := runtimeconfig.Config{ + StorageConfig: bucket.Config{ + Backend: bucket.S3, + S3: s3.Config{ + AccessKeyID: "test-access-key", + SecretAccessKey: flagext.Secret{Value: "test-secret-key"}, + BucketName: "test-bucket", + Endpoint: "localhost:9000", + Insecure: true, + }, + }, + } + // Set default values before validation + flagext.DefaultValues(&cfg.StorageConfig.S3) + api, err := New(cfg, log.Logger, prometheus.DefaultRegisterer) + require.NoError(t, err) + require.NotNil(t, api) + + // Verify it implements the Service interface + require.Implements(t, (*services.Service)(nil), api) + + // Verify initial state + assert.Equal(t, services.New, api.State()) + + // Verify the service has the expected methods + // This is a basic check that the service was properly constructed + assert.NotNil(t, api.Service) +} + +// TestAPIEndpoints tests the actual HTTP API endpoints +func TestAPIEndpoints(t *testing.T) { + tests := []struct { + name string + method string + path string + tenantID string + requestBody any + expectedStatus int + setupMock func(*bucket.ClientMock) + validateResponse func(*testing.T, *httptest.ResponseRecorder) + }{ + { + name: "GET overrides - no tenant ID", + method: "GET", + path: "/api/v1/user-overrides", + tenantID: "", + expectedStatus: http.StatusBadRequest, + }, + { + name: "GET overrides - valid tenant ID, no overrides", + method: "GET", + path: "/api/v1/user-overrides", + tenantID: "user123", + expectedStatus: http.StatusNotFound, + setupMock: func(mock *bucket.ClientMock) { + // Mock that no overrides exist by passing empty content + mock.MockGet("runtime.yaml", "overrides:\n", nil) + }, + }, + { + name: "GET overrides - valid tenant ID, with overrides", + method: "GET", + path: "/api/v1/user-overrides", + tenantID: "user456", + expectedStatus: http.StatusOK, + setupMock: func(mock *bucket.ClientMock) { + overridesData := `overrides: + user456: + ingestion_rate: 5000 + max_global_series_per_user: 100000` + mock.MockGet("runtime.yaml", overridesData, nil) + }, + validateResponse: func(t *testing.T, recorder *httptest.ResponseRecorder) { + var response map[string]any + err := json.Unmarshal(recorder.Body.Bytes(), &response) + require.NoError(t, err) + assert.Equal(t, float64(5000), response["ingestion_rate"]) + assert.Equal(t, float64(100000), response["max_global_series_per_user"]) + }, + }, + { + name: "GET overrides - valid tenant ID, user does not exist", + method: "GET", + path: "/api/v1/user-overrides", + tenantID: "nonexistent_user", + expectedStatus: http.StatusBadRequest, + setupMock: func(mock *bucket.ClientMock) { + // Mock runtime config with different user + overridesData := `overrides: + other_user: + ingestion_rate: 5000` + mock.MockGet("runtime.yaml", overridesData, nil) + }, + }, + { + name: "POST overrides - no tenant ID", + method: "POST", + path: "/api/v1/user-overrides", + tenantID: "", + requestBody: map[string]any{"ingestion_rate": 5000}, + expectedStatus: http.StatusBadRequest, + }, + { + name: "POST overrides - valid tenant ID, valid overrides", + method: "POST", + path: "/api/v1/user-overrides", + tenantID: "user789", + requestBody: map[string]any{"ingestion_rate": 5000, "ruler_max_rules_per_rule_group": 10}, + expectedStatus: http.StatusOK, + setupMock: func(mock *bucket.ClientMock) { + // Mock runtime config with allowed limits + runtimeConfig := `overrides: + user789: + ingestion_rate: 5000 + ruler_max_rules_per_rule_group: 10 +api_allowed_limits: + - ingestion_rate + - ruler_max_rules_per_rule_group + - max_global_series_per_user + - max_global_series_per_metric + - ingestion_burst_size + - ruler_max_rule_groups_per_tenant` + // Mock both reads: one for getAllowedLimitsFromBucket, one for setOverridesToBucket + mock.MockGet("runtime.yaml", runtimeConfig, nil) + mock.MockGet("runtime.yaml", runtimeConfig, nil) + mock.MockUpload("runtime.yaml", nil) + }, + }, + { + name: "POST overrides - invalid limit name", + method: "POST", + path: "/api/v1/user-overrides", + tenantID: "user999", + requestBody: map[string]any{"invalid_limit": 5000}, + expectedStatus: http.StatusBadRequest, + setupMock: func(mock *bucket.ClientMock) { + // Mock runtime config with allowed limits (invalid_limit not included) + runtimeConfig := `api_allowed_limits: + - ingestion_rate + - ruler_max_rules_per_rule_group + - max_global_series_per_user + - max_global_series_per_metric + - ingestion_burst_size + - ruler_max_rule_groups_per_tenant` + mock.MockGet("runtime.yaml", runtimeConfig, nil) + }, + }, + + { + name: "POST overrides - invalid JSON", + method: "POST", + path: "/api/v1/user-overrides", + tenantID: "user999", + requestBody: "invalid json", + expectedStatus: http.StatusBadRequest, + }, + { + name: "POST overrides - merge with existing overrides", + method: "POST", + path: "/api/v1/user-overrides", + tenantID: "user888", + requestBody: map[string]any{"ingestion_rate": 8000}, // Only update ingestion_rate + expectedStatus: http.StatusOK, + setupMock: func(m *bucket.ClientMock) { + // Mock runtime config with existing overrides for user888 + initialConfig := `overrides: + user888: + ingestion_rate: 5000 + max_global_series_per_user: 100000 + ruler_max_rules_per_rule_group: 20 +api_allowed_limits: + - ingestion_rate + - max_global_series_per_user + - ruler_max_rules_per_rule_group + - ingestion_burst_size` + // First read for getAllowedLimitsFromBucket + m.MockGet("runtime.yaml", initialConfig, nil) + // Second read for setOverridesToBucket to get existing overrides + m.MockGet("runtime.yaml", initialConfig, nil) + + // Mock upload and validate the merged content + m.On("Upload", mock.Anything, "runtime.yaml", mock.Anything, mock.Anything). + Return(nil). + Run(func(args mock.Arguments) { + // Read the uploaded content + reader := args.Get(2).(io.Reader) + content, err := io.ReadAll(reader) + if err != nil { + panic(fmt.Sprintf("failed to read uploaded content: %v", err)) + } + + // Verify that the uploaded content has merged values + var config runtimeconfig.RuntimeConfigValues + if err := yaml.Unmarshal(content, &config); err != nil { + panic(fmt.Sprintf("failed to unmarshal uploaded config: %v", err)) + } + + // Check that all three fields are present + if config.TenantLimits == nil || config.TenantLimits["user888"] == nil { + panic("tenant limits for user888 not found") + } + + limits := config.TenantLimits["user888"] + + // Verify ingestion_rate was updated + if limits.IngestionRate != 8000 { + panic(fmt.Sprintf("expected ingestion_rate to be 8000, got %f", limits.IngestionRate)) + } + + // Verify max_global_series_per_user was preserved + if limits.MaxGlobalSeriesPerUser != 100000 { + panic(fmt.Sprintf("expected max_global_series_per_user to be preserved at 100000, got %d", limits.MaxGlobalSeriesPerUser)) + } + + // Verify ruler_max_rules_per_rule_group was preserved + if limits.RulerMaxRulesPerRuleGroup != 20 { + panic(fmt.Sprintf("expected ruler_max_rules_per_rule_group to be preserved at 20, got %d", limits.RulerMaxRulesPerRuleGroup)) + } + }) + }, + }, + { + name: "POST overrides - exceeding hard limit from runtime config", + method: "POST", + path: "/api/v1/user-overrides", + tenantID: "user999", + requestBody: map[string]any{"ingestion_rate": 1500000}, // Exceeds hard limit of 1000000 + expectedStatus: http.StatusBadRequest, + setupMock: func(mock *bucket.ClientMock) { + // Mock runtime config with per-user hard limits and allowed limits + runtimeConfig := `overrides: + user999: + ingestion_rate: 1000 +hard_overrides: + user999: + ingestion_rate: 1000000 + max_global_series_per_user: 5000000 +api_allowed_limits: + - ingestion_rate + - max_global_series_per_user` + // Mock all reads: one for getAllowedLimitsFromBucket, one for validateHardLimits, one for setOverridesToBucket + mock.MockGet("runtime.yaml", runtimeConfig, nil) + mock.MockGet("runtime.yaml", runtimeConfig, nil) + mock.MockGet("runtime.yaml", runtimeConfig, nil) + mock.MockUpload("runtime.yaml", nil) + }, + }, + { + name: "DELETE overrides - no tenant ID", + method: "DELETE", + path: "/api/v1/user-overrides", + tenantID: "", + expectedStatus: http.StatusBadRequest, + }, + { + name: "DELETE overrides - valid tenant ID", + method: "DELETE", + path: "/api/v1/user-overrides", + tenantID: "user123", + expectedStatus: http.StatusOK, + setupMock: func(mock *bucket.ClientMock) { + // First read succeeds, then upload succeeds + mock.MockGet("runtime.yaml", "overrides:\n user123:\n ingestion_rate: 1000", nil) + mock.MockUpload("runtime.yaml", nil) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a mock bucket client + mockBucket := &bucket.ClientMock{} + if tt.setupMock != nil { + tt.setupMock(mockBucket) + } + + // Create the API instance with proper configuration + cfg := runtimeconfig.Config{ + StorageConfig: bucket.Config{ + Backend: bucket.S3, + S3: s3.Config{ + AccessKeyID: "test-access-key", + SecretAccessKey: flagext.Secret{Value: "test-secret-key"}, + BucketName: "test-bucket", + Endpoint: "localhost:9000", + Insecure: true, + }, + }, + } + // Set default values before validation + flagext.DefaultValues(&cfg.StorageConfig.S3) + api, err := New(cfg, log.Logger, prometheus.DefaultRegisterer) + require.NoError(t, err) + require.NotNil(t, api) + + // Manually set the bucket client and runtime config path for testing + api.bucketClient = mockBucket + api.runtimeConfigPath = "runtime.yaml" + + // Create the request + var req *http.Request + if tt.requestBody != nil { + var body []byte + if str, ok := tt.requestBody.(string); ok { + body = []byte(str) + } else { + body, err = json.Marshal(tt.requestBody) + require.NoError(t, err) + } + req = httptest.NewRequest(tt.method, tt.path, bytes.NewReader(body)) + } else { + req = httptest.NewRequest(tt.method, tt.path, nil) + } + + // Add tenant ID header if provided + if tt.tenantID != "" { + req.Header.Set("X-Scope-OrgID", tt.tenantID) + } + + // Create response recorder + recorder := httptest.NewRecorder() + + // Call the appropriate handler based on method + switch tt.method { + case "GET": + api.GetOverrides(recorder, req) + case "POST": + api.SetOverrides(recorder, req) + case "DELETE": + api.DeleteOverrides(recorder, req) + default: + t.Fatalf("Unsupported method: %s", tt.method) + } + + // Assert status code + assert.Equal(t, tt.expectedStatus, recorder.Code) + + // Validate response if validation function provided + if tt.validateResponse != nil { + tt.validateResponse(t, recorder) + } + }) + } +} + +// TestAPITenantExtraction tests tenant ID extraction from various header formats +func TestAPITenantExtraction(t *testing.T) { + tests := []struct { + name string + headers map[string]string + expectedTenant string + expectStatus int + setupMock func(*bucket.ClientMock) + }{ + { + name: "X-Scope-OrgID header", + headers: map[string]string{"X-Scope-OrgID": "tenant1"}, + expectedTenant: "tenant1", + expectStatus: http.StatusNotFound, + setupMock: func(mock *bucket.ClientMock) { + // Mock successful get with empty overrides + mock.MockGet("runtime.yaml", "overrides:\n", nil) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a mock bucket client + mockBucket := &bucket.ClientMock{} + if tt.setupMock != nil { + tt.setupMock(mockBucket) + } + + // Create the API instance with proper configuration + cfg := runtimeconfig.Config{ + StorageConfig: bucket.Config{ + Backend: bucket.S3, + S3: s3.Config{ + AccessKeyID: "test-access-key", + SecretAccessKey: flagext.Secret{Value: "test-secret-key"}, + BucketName: "test-bucket", + Endpoint: "localhost:9000", + Insecure: true, + }, + }, + } + // Set default values before validation + flagext.DefaultValues(&cfg.StorageConfig.S3) + api, err := New(cfg, log.Logger, prometheus.DefaultRegisterer) + require.NoError(t, err) + require.NotNil(t, api) + + // Manually set the bucket client and runtime config path for testing + api.bucketClient = mockBucket + api.runtimeConfigPath = "runtime.yaml" + + // Create the request + req := httptest.NewRequest("GET", "/api/v1/user-overrides", nil) + for key, value := range tt.headers { + req.Header.Set(key, value) + } + + // Create response recorder + recorder := httptest.NewRecorder() + + // Call the handler + api.GetOverrides(recorder, req) + + // Assert based on expected behavior + assert.Equal(t, tt.expectStatus, recorder.Code) + }) + } +} + +// TestAPIBucketErrors tests how the API handles bucket operation errors +func TestAPIBucketErrors(t *testing.T) { + tests := []struct { + name string + method string + tenantID string + setupMock func(*bucket.ClientMock) + expectedStatus int + }{ + { + name: "GET overrides - bucket error returns internal server error", + method: "GET", + tenantID: "user123", + setupMock: func(mock *bucket.ClientMock) { + mock.MockGet("runtime.yaml", "", fmt.Errorf("bucket error")) + }, + expectedStatus: http.StatusInternalServerError, + }, + { + name: "POST overrides - bucket upload error", + method: "POST", + tenantID: "user456", + setupMock: func(mock *bucket.ClientMock) { + // Mock runtime config with allowed limits + runtimeConfig := `overrides: + user456: + ingestion_rate: 1000 +api_allowed_limits: + - ingestion_rate + - max_global_series_per_user` + // First read succeeds (for allowed limits), then upload fails + mock.MockGet("runtime.yaml", runtimeConfig, nil) + mock.MockGet("runtime.yaml", runtimeConfig, nil) + mock.MockUpload("runtime.yaml", fmt.Errorf("upload error")) + }, + expectedStatus: http.StatusInternalServerError, + }, + { + name: "DELETE overrides - bucket delete error", + method: "DELETE", + tenantID: "user789", + setupMock: func(mock *bucket.ClientMock) { + // First read succeeds, then upload fails + mock.MockGet("runtime.yaml", "overrides:\n user789:\n ingestion_rate: 1000", nil) + mock.MockUpload("runtime.yaml", fmt.Errorf("upload error")) + }, + expectedStatus: http.StatusInternalServerError, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a mock bucket client + mockBucket := &bucket.ClientMock{} + tt.setupMock(mockBucket) + + // Create the API instance with proper configuration + cfg := runtimeconfig.Config{ + StorageConfig: bucket.Config{ + Backend: bucket.S3, + S3: s3.Config{ + AccessKeyID: "test-access-key", + SecretAccessKey: flagext.Secret{Value: "test-secret-key"}, + BucketName: "test-bucket", + Endpoint: "localhost:9000", + Insecure: true, + }, + }, + } + // Set default values before validation + flagext.DefaultValues(&cfg.StorageConfig.S3) + api, err := New(cfg, log.Logger, prometheus.DefaultRegisterer) + require.NoError(t, err) + require.NotNil(t, api) + + // Manually set the bucket client and runtime config path for testing + api.bucketClient = mockBucket + api.runtimeConfigPath = "runtime.yaml" + + // Create the request + var req *http.Request + if tt.method == "POST" { + requestBody := map[string]any{"ingestion_rate": 5000} + body, err := json.Marshal(requestBody) + require.NoError(t, err) + req = httptest.NewRequest(tt.method, "/api/v1/user-overrides", bytes.NewReader(body)) + } else { + req = httptest.NewRequest(tt.method, "/api/v1/user-overrides", nil) + } + + // Add tenant ID header + req.Header.Set("X-Scope-OrgID", tt.tenantID) + + // Create response recorder + recorder := httptest.NewRecorder() + + // Call the appropriate handler + switch tt.method { + case "GET": + api.GetOverrides(recorder, req) + case "POST": + api.SetOverrides(recorder, req) + case "DELETE": + api.DeleteOverrides(recorder, req) + } + + // Assert status code + assert.Equal(t, tt.expectedStatus, recorder.Code) + }) + } +} diff --git a/pkg/util/runtimeconfig/types.go b/pkg/util/runtimeconfig/types.go new file mode 100644 index 00000000000..19285d77c73 --- /dev/null +++ b/pkg/util/runtimeconfig/types.go @@ -0,0 +1,21 @@ +package runtimeconfig + +import ( + "github.com/cortexproject/cortex/pkg/ingester" + "github.com/cortexproject/cortex/pkg/ring/kv" + "github.com/cortexproject/cortex/pkg/util/validation" +) + +type RuntimeConfigValues struct { + TenantLimits map[string]*validation.Limits `yaml:"overrides"` + + Multi kv.MultiRuntimeConfig `yaml:"multi_kv_config"` + + IngesterChunkStreaming *bool `yaml:"ingester_stream_chunks_when_using_blocks"` + + IngesterLimits *ingester.InstanceLimits `yaml:"ingester_limits"` + + HardTenantLimits map[string]*validation.Limits `yaml:"hard_overrides,omitempty"` + + APIAllowedLimits []string `yaml:"api_allowed_limits,omitempty"` +}