Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
* [ENHANCEMENT] Compactor: Add concurrency for partition cleanup and mark block for deletion #7246
* [ENHANCEMENT] Distributor: Validate metric name before removing empty labels. #7253
* [ENHANCEMENT] Make cortex_ingester_tsdb_sample_ooo_delta metric per-tenant #7278
* [ENHANCEMENT] Distributor: Add dimension `nhcb` to keep track of nhcb samples in `cortex_distributor_received_samples_total` and `cortex_distributor_samples_in_total` metrics.
* [BUGFIX] Distributor: If remote write v2 is disabled, explicitly return HTTP 415 (Unsupported Media Type) for Remote Write V2 requests instead of attempting to parse them as V1. #7238
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088
* [BUGFIX] Ruler: Add XFunctions validation support. #7111
Expand Down
42 changes: 33 additions & 9 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/scrape"
Expand Down Expand Up @@ -73,8 +74,9 @@ const (
// it was based on empirical observation: See BenchmarkMergeSlicesParallel
mergeSlicesParallelism = 8

sampleMetricTypeFloat = "float"
sampleMetricTypeHistogram = "histogram"
sampleMetricTypeFloat = "float"
sampleMetricTypeHistogram = "histogram"
sampleMetricTypeHistogramNHCB = "nhcb" // Native histogram with custom buckets schema
)

// Distributor is a storage.SampleAppender and a client.Querier which
Expand Down Expand Up @@ -521,10 +523,12 @@ func (d *Distributor) cleanupInactiveUser(userID string) {

d.receivedSamples.DeleteLabelValues(userID, sampleMetricTypeFloat)
d.receivedSamples.DeleteLabelValues(userID, sampleMetricTypeHistogram)
d.receivedSamples.DeleteLabelValues(userID, sampleMetricTypeHistogramNHCB)
d.receivedExemplars.DeleteLabelValues(userID)
d.receivedMetadata.DeleteLabelValues(userID)
d.incomingSamples.DeleteLabelValues(userID, sampleMetricTypeFloat)
d.incomingSamples.DeleteLabelValues(userID, sampleMetricTypeHistogram)
d.incomingSamples.DeleteLabelValues(userID, sampleMetricTypeHistogramNHCB)
d.incomingExemplars.DeleteLabelValues(userID)
d.incomingMetadata.DeleteLabelValues(userID)
d.nonHASamples.DeleteLabelValues(userID)
Expand Down Expand Up @@ -734,15 +738,19 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co

numFloatSamples := 0
numHistogramSamples := 0
numNHCBSamples := 0
numExemplars := 0
for _, ts := range req.Timeseries {
numFloatSamples += len(ts.Samples)
nhcb := countNHCB(ts.Histograms)
numNHCBSamples += nhcb
numHistogramSamples += len(ts.Histograms)
numExemplars += len(ts.Exemplars)
}
// Count the total samples, exemplars in, prior to validation or deduplication, for comparison with other metrics.
d.incomingSamples.WithLabelValues(userID, sampleMetricTypeFloat).Add(float64(numFloatSamples))
d.incomingSamples.WithLabelValues(userID, sampleMetricTypeHistogram).Add(float64(numHistogramSamples))
d.incomingSamples.WithLabelValues(userID, sampleMetricTypeHistogramNHCB).Add(float64(numNHCBSamples))
d.incomingExemplars.WithLabelValues(userID).Add(float64(numExemplars))
// Count the total number of metadata in.
d.incomingMetadata.WithLabelValues(userID).Add(float64(len(req.Metadata)))
Expand Down Expand Up @@ -797,14 +805,15 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
}

// A WriteRequest can only contain series or metadata but not both. This might change in the future.
seriesKeys, nhSeriesKeys, validatedTimeseries, nhValidatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, err := d.prepareSeriesKeys(ctx, req, userID, limits, removeReplica)
seriesKeys, nhSeriesKeys, validatedTimeseries, nhValidatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedNHCBSamples, validatedExemplars, firstPartialErr, err := d.prepareSeriesKeys(ctx, req, userID, limits, removeReplica)
if err != nil {
return nil, err
}
metadataKeys, validatedMetadata, firstPartialErr := d.prepareMetadataKeys(req, limits, userID, firstPartialErr)

d.receivedSamples.WithLabelValues(userID, sampleMetricTypeFloat).Add(float64(validatedFloatSamples))
d.receivedSamples.WithLabelValues(userID, sampleMetricTypeHistogram).Add(float64(validatedHistogramSamples))
d.receivedSamples.WithLabelValues(userID, sampleMetricTypeHistogramNHCB).Add(float64(validatedNHCBSamples))
d.receivedExemplars.WithLabelValues(userID).Add(float64(validatedExemplars))
d.receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata)))

Expand Down Expand Up @@ -1012,7 +1021,18 @@ type samplesLabelSetEntry struct {
labels labels.Labels
}

func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.WriteRequest, userID string, limits *validation.Limits, removeReplica bool) ([]uint32, []uint32, []cortexpb.PreallocTimeseries, []cortexpb.PreallocTimeseries, int, int, int, error, error) {
// countNHCB returns the number of native histograms with custom buckets schema in the given slice.
func countNHCB(histograms []cortexpb.Histogram) int {
n := 0
for _, h := range histograms {
if histogram.IsCustomBucketsSchema(h.GetSchema()) {
n++
}
}
return n
}

func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.WriteRequest, userID string, limits *validation.Limits, removeReplica bool) ([]uint32, []uint32, []cortexpb.PreallocTimeseries, []cortexpb.PreallocTimeseries, int, int, int, int, error, error) {
pSpan, _ := opentracing.StartSpanFromContext(ctx, "prepareSeriesKeys")
defer pSpan.Finish()

Expand All @@ -1024,6 +1044,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
nhSeriesKeys := make([]uint32, 0, len(req.Timeseries))
validatedFloatSamples := 0
validatedHistogramSamples := 0
validatedNHCBSamples := 0
validatedExemplars := 0
limitsPerLabelSet := d.limits.LimitsPerLabelSet(userID)

Expand All @@ -1043,9 +1064,10 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
// For each timeseries, compute a hash to distribute across ingesters;
// check each sample and discard if outside limits.
skipLabelNameValidation := d.cfg.SkipLabelNameValidation || req.GetSkipLabelNameValidation()
for _, ts := range req.Timeseries {
for i := range req.Timeseries {
ts := &req.Timeseries[i]
if len(ts.Labels) == 0 {
return nil, nil, nil, nil, 0, 0, 0, nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", "empty labels found")
return nil, nil, nil, nil, 0, 0, 0, 0, nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", "empty labels found")
}

if limits.AcceptHASamples && limits.AcceptMixedHASamples {
Expand Down Expand Up @@ -1148,9 +1170,9 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
// label and dropped labels (if any)
key, err := d.tokenForLabels(userID, ts.Labels)
if err != nil {
return nil, nil, nil, nil, 0, 0, 0, nil, err
return nil, nil, nil, nil, 0, 0, 0, 0, nil, err
}
validatedSeries, validationErr := d.validateSeries(ts, userID, skipLabelNameValidation, limits)
validatedSeries, validationErr := d.validateSeries(*ts, userID, skipLabelNameValidation, limits)

// Errors in validation are considered non-fatal, as one series in a request may contain
// invalid data but all the remaining series could be perfectly valid.
Expand All @@ -1170,6 +1192,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
// TODO: use pool.
labelSetCounters = make(map[uint64]*samplesLabelSetEntry, len(matchedLabelSetLimits))
}
nhcb := countNHCB(ts.Histograms)
for _, l := range matchedLabelSetLimits {
if c, exists := labelSetCounters[l.Hash]; exists {
c.floatSamples += int64(len(ts.Samples))
Expand All @@ -1192,6 +1215,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
}
validatedFloatSamples += len(ts.Samples)
validatedHistogramSamples += len(ts.Histograms)
validatedNHCBSamples += nhcb
validatedExemplars += len(ts.Exemplars)
}
for h, counter := range labelSetCounters {
Expand All @@ -1205,7 +1229,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
}
}

return seriesKeys, nhSeriesKeys, validatedTimeseries, nhValidatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, nil
return seriesKeys, nhSeriesKeys, validatedTimeseries, nhValidatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedNHCBSamples, validatedExemplars, firstPartialErr, nil
}

func sortLabelsIfNeeded(labels []cortexpb.LabelAdapter) {
Expand Down
59 changes: 58 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func TestDistributor_Push(t *testing.T) {
happyIngesters int
samples samplesIn
histogramSamples bool
nhcbSamples int
metadata int
expectedResponse *cortexpb.WriteResponse
expectedError error
Expand Down Expand Up @@ -296,6 +297,7 @@ func TestDistributor_Push(t *testing.T) {
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{type="float",user="userDistributorPush"} 0
cortex_distributor_received_samples_total{type="histogram",user="userDistributorPush"} 5
cortex_distributor_received_samples_total{type="nhcb",user="userDistributorPush"} 0
`,
},
"A push to 2 happy ingesters should succeed, histograms": {
Expand All @@ -314,6 +316,7 @@ func TestDistributor_Push(t *testing.T) {
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{type="float",user="userDistributorPush"} 0
cortex_distributor_received_samples_total{type="histogram",user="userDistributorPush"} 5
cortex_distributor_received_samples_total{type="nhcb",user="userDistributorPush"} 0
`,
},
"A push to 1 happy ingesters should fail, histograms": {
Expand All @@ -331,6 +334,7 @@ func TestDistributor_Push(t *testing.T) {
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{type="float",user="userDistributorPush"} 0
cortex_distributor_received_samples_total{type="histogram",user="userDistributorPush"} 10
cortex_distributor_received_samples_total{type="nhcb",user="userDistributorPush"} 0
`,
},
"A push exceeding burst size should fail, histograms": {
Expand All @@ -349,6 +353,26 @@ func TestDistributor_Push(t *testing.T) {
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{type="float",user="userDistributorPush"} 0
cortex_distributor_received_samples_total{type="histogram",user="userDistributorPush"} 25
cortex_distributor_received_samples_total{type="nhcb",user="userDistributorPush"} 0
`,
},
"A push to 3 happy ingesters should succeed, NHCB histograms": {
numIngesters: 3,
happyIngesters: 3,
samples: samplesIn{num: 0, startTimestampMs: 123456789000},
metadata: 2,
nhcbSamples: 4,
expectedResponse: emptyResponse,
metricNames: []string{lastSeenTimestamp, distributorReceivedSamples},
expectedMetrics: `
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
# TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge
cortex_distributor_latest_seen_sample_timestamp_seconds{user="userDistributorPush"} 123456789.003
# HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples.
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{type="float",user="userDistributorPush"} 0
cortex_distributor_received_samples_total{type="histogram",user="userDistributorPush"} 4
cortex_distributor_received_samples_total{type="nhcb",user="userDistributorPush"} 4
`,
},
} {
Expand Down Expand Up @@ -376,7 +400,13 @@ func TestDistributor_Push(t *testing.T) {
})

var request *cortexpb.WriteRequest
if !tc.histogramSamples {
if tc.nhcbSamples > 0 {
if !tc.histogramSamples {
request = makeWriteRequestWithNHCB(tc.samples.startTimestampMs, tc.samples.num, tc.metadata, 0, tc.nhcbSamples)
} else {
request = makeWriteRequestWithNHCB(tc.samples.startTimestampMs, 0, tc.metadata, tc.samples.num, tc.nhcbSamples)
}
} else if !tc.histogramSamples {
request = makeWriteRequest(tc.samples.startTimestampMs, tc.samples.num, tc.metadata, 0)
} else {
request = makeWriteRequest(tc.samples.startTimestampMs, 0, tc.metadata, tc.samples.num)
Expand Down Expand Up @@ -434,6 +464,7 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
d.receivedSamples.WithLabelValues("userA", sampleMetricTypeFloat).Add(5)
d.receivedSamples.WithLabelValues("userB", sampleMetricTypeFloat).Add(10)
d.receivedSamples.WithLabelValues("userC", sampleMetricTypeHistogram).Add(15)
d.receivedSamples.WithLabelValues("userC", sampleMetricTypeHistogramNHCB).Add(3)
d.receivedExemplars.WithLabelValues("userA").Add(5)
d.receivedExemplars.WithLabelValues("userB").Add(10)
d.receivedMetadata.WithLabelValues("userA").Add(5)
Expand Down Expand Up @@ -492,6 +523,7 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
cortex_distributor_received_samples_total{type="float",user="userA"} 5
cortex_distributor_received_samples_total{type="float",user="userB"} 10
cortex_distributor_received_samples_total{type="histogram",user="userC"} 15
cortex_distributor_received_samples_total{type="nhcb",user="userC"} 3
# HELP cortex_distributor_received_exemplars_total The total number of received exemplars, excluding rejected and deduped exemplars.
# TYPE cortex_distributor_received_exemplars_total counter
Expand Down Expand Up @@ -550,6 +582,7 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{type="float",user="userB"} 10
cortex_distributor_received_samples_total{type="histogram",user="userC"} 15
cortex_distributor_received_samples_total{type="nhcb",user="userC"} 3
# HELP cortex_distributor_samples_in_total The total number of samples that have come in to the distributor, including rejected or deduped samples.
# TYPE cortex_distributor_samples_in_total counter
Expand Down Expand Up @@ -3317,6 +3350,11 @@ func stopAll(ds []*Distributor, r *ring.Ring) {
}

func makeWriteRequest(startTimestampMs int64, samples int, metadata int, histograms int) *cortexpb.WriteRequest {
return makeWriteRequestWithNHCB(startTimestampMs, samples, metadata, histograms, 0)
}

// makeWriteRequestWithNHCB builds a write request with optional NHCB (native histogram custom buckets) timeseries.
func makeWriteRequestWithNHCB(startTimestampMs int64, samples int, metadata int, histograms int, nhcb int) *cortexpb.WriteRequest {
request := &cortexpb.WriteRequest{}
for i := range samples {
request.Timeseries = append(request.Timeseries, makeWriteRequestTimeseries(
Expand All @@ -3336,6 +3374,16 @@ func makeWriteRequest(startTimestampMs int64, samples int, metadata int, histogr
}, startTimestampMs+int64(i), int64(i), true))
}

for i := range nhcb {
ts := startTimestampMs + int64(i)
request.Timeseries = append(request.Timeseries, makeWriteRequestTimeseriesNHCB(
[]cortexpb.LabelAdapter{
{Name: model.MetricNameLabel, Value: "foo"},
{Name: "bar", Value: "baz"},
{Name: "nhcb", Value: fmt.Sprintf("%d", i)},
}, ts, int64(i)))
}

for i := range metadata {
m := &cortexpb.MetricMetadata{
MetricFamilyName: fmt.Sprintf("metric_%d", i),
Expand Down Expand Up @@ -3365,6 +3413,15 @@ func makeWriteRequestTimeseries(labels []cortexpb.LabelAdapter, ts, value int64,
return t
}

func makeWriteRequestTimeseriesNHCB(labels []cortexpb.LabelAdapter, ts, value int64) cortexpb.PreallocTimeseries {
return cortexpb.PreallocTimeseries{
TimeSeries: &cortexpb.TimeSeries{
Labels: labels,
Histograms: []cortexpb.Histogram{cortexpb.HistogramToHistogramProto(ts, tsdbutil.GenerateTestCustomBucketsHistogram(value))},
},
}
}

func makeWriteRequestHA(samples int, replica, cluster string, histogram bool) *cortexpb.WriteRequest {
request := &cortexpb.WriteRequest{}
for i := range samples {
Expand Down
Loading