diff --git a/CHANGELOG.md b/CHANGELOG.md index a9441f1ddc..79f928a907 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ * [FEATURE] StoreGateway: Introduces a new parquet mode. #7046 * [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077 * [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152 +* [FEATURE] Ingester: Add experimental active series queried metric. #7173 * [ENHANCEMENT] StoreGateway: Add tracings to parquet mode. #7125 * [ENHANCEMENT] Alertmanager: Upgrade alertmanger to 0.29.0 and add a new incidentIO integration. #7092 * [ENHANCEMENT] Querier: Add a `-querier.parquet-queryable-shard-cache-ttl` flag to add TTL to parquet shard cache. #7098 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index c10073e49b..260f9a40a6 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3702,6 +3702,30 @@ lifecycler: # CLI flag: -ingester.active-series-metrics-idle-timeout [active_series_metrics_idle_timeout: | default = 10m] +# Enable tracking of active queried series using probabilistic data structure +# and export them as metrics. +# CLI flag: -ingester.active-queried-series-metrics-enabled +[active_queried_series_metrics_enabled: | default = false] + +# How often to update active queried series metrics. +# CLI flag: -ingester.active-queried-series-metrics-update-period +[active_queried_series_metrics_update_period: | default = 1m] + +# Duration of each sub-window for active queried series tracking (e.g., 1 +# minute). Used to divide the total tracking period into smaller windows. +# CLI flag: -ingester.active-queried-series-metrics-window-duration +[active_queried_series_metrics_window_duration: | default = 15m] + +# Sampling rate for active queried series tracking (1.0 = 100% sampling, 0.1 = +# 10% sampling). By default, all queries are sampled. +# CLI flag: -ingester.active-queried-series-metrics-sample-rate +[active_queried_series_metrics_sample_rate: | default = 1] + +# Time windows to expose queried series metric. Each window tracks queried +# series within that time period. +# CLI flag: -ingester.active-queried-series-metrics-windows +[active_queried_series_metrics_windows: | default = 2h0m0s] + # Enable uploading compacted blocks. # CLI flag: -ingester.upload-compacted-blocks-enabled [upload_compacted_blocks_enabled: | default = true] diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 9cca951266..62f8649416 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -133,3 +133,9 @@ Currently experimental features are: - Distributor/Ingester: Stream push connection - Enable stream push connection between distributor and ingester by setting `-distributor.use-stream-push=true` on Distributor. - Add `__type__` and `__unit__` labels to OTLP and remote write v2 requests (`-distributor.enable-type-and-unit-labels`) +- Ingester: Series Queried Metric + - Enable on Ingester via `-ingester.active-queried-series-metrics-enabled=true` + - Set the time window to expose via metrics using `-ingester.active-queried-series-metrics-windows=2h`. At least 1 time window is required to expose the metric. + - `-ingester.active-queried-series-metrics-update-period` metric update interval + - `-ingester.active-queried-series-metrics-window-duration` each HyperLogLog time window size + - `-ingester.active-queried-series-metrics-sample-rate` query sampling rate diff --git a/go.mod b/go.mod index 9a26035ca6..3a6a012cde 100644 --- a/go.mod +++ b/go.mod @@ -78,6 +78,7 @@ require ( github.com/aws/aws-sdk-go-v2 v1.39.2 github.com/aws/aws-sdk-go-v2/config v1.31.12 github.com/aws/aws-sdk-go-v2/service/dynamodb v1.50.1 + github.com/axiomhq/hyperloglog v0.2.6 github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 github.com/cespare/xxhash/v2 v2.3.0 github.com/go-openapi/swag/jsonutils v0.25.1 @@ -202,6 +203,7 @@ require ( github.com/jessevdk/go-flags v1.6.1 // indirect github.com/jpillora/backoff v1.0.0 // indirect github.com/julienschmidt/httprouter v1.3.0 // indirect + github.com/kamstrup/intmap v0.5.2 // indirect github.com/klauspost/cpuid/v2 v2.2.10 // indirect github.com/knadh/koanf/maps v0.1.2 // indirect github.com/knadh/koanf/providers/confmap v1.0.0 // indirect diff --git a/go.sum b/go.sum index 8dab5b15df..8bfaa75fbe 100644 --- a/go.sum +++ b/go.sum @@ -897,6 +897,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.38.6 h1:p3jIvqYwUZgu/XYeI48bJxOhvm47 github.com/aws/aws-sdk-go-v2/service/sts v1.38.6/go.mod h1:WtKK+ppze5yKPkZ0XwqIVWD4beCwv056ZbPQNoeHqM8= github.com/aws/smithy-go v1.23.0 h1:8n6I3gXzWJB2DxBDnfxgBaSX6oe0d/t10qGz7OKqMCE= github.com/aws/smithy-go v1.23.0/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI= +github.com/axiomhq/hyperloglog v0.2.6 h1:sRhvvF3RIXWQgAXaTphLp4yJiX4S0IN3MWTaAgZoRJw= +github.com/axiomhq/hyperloglog v0.2.6/go.mod h1:YjX/dQqCR/7QYX0g8mu8UZAjpIenz1FKM71UEsjFoTo= github.com/baidubce/bce-sdk-go v0.9.230 h1:HzELBKiD7QAgYqZ1qHZexoI2A3Lo/6zYGQFvcUbS5cA= github.com/baidubce/bce-sdk-go v0.9.230/go.mod h1:zbYJMQwE4IZuyrJiFO8tO8NbtYiKTFTbwh4eIsqjVdg= github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 h1:6df1vn4bBlDDo4tARvBm7l6KA9iVMnE3NWizDeWSrps= @@ -1422,6 +1424,8 @@ github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4d github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/jung-kurt/gofpdf v1.0.0/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= +github.com/kamstrup/intmap v0.5.2 h1:qnwBm1mh4XAnW9W9Ue9tZtTff8pS6+s6iKF6JRIV2Dk= +github.com/kamstrup/intmap v0.5.2/go.mod h1:gWUVWHKzWj8xpJVFf5GC0O26bWmv3GqdnIX/LMT6Aq4= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= github.com/keybase/go-keychain v0.0.1 h1:way+bWYa6lDppZoZcgMbYsvC7GxljxrskdNInRtuthU= github.com/keybase/go-keychain v0.0.1/go.mod h1:PdEILRW3i9D8JcdM+FmY6RwkHGnhHxXwkPPMeUgOK1k= diff --git a/pkg/ingester/active_queried_series.go b/pkg/ingester/active_queried_series.go new file mode 100644 index 0000000000..2e28d44433 --- /dev/null +++ b/pkg/ingester/active_queried_series.go @@ -0,0 +1,467 @@ +package ingester + +import ( + "context" + "fmt" + "math/rand" + "runtime" + "sync" + "time" + + "github.com/axiomhq/hyperloglog" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/cortexproject/cortex/pkg/util/services" +) + +// ActiveQueriedSeries tracks unique queried series using time-windowed HyperLogLog. +// It maintains multiple HyperLogLog sketches in a circular buffer, one per time window. +// It can track up to the maximum configured window duration and query for specific window durations. +type ActiveQueriedSeries struct { + windowDuration time.Duration + numWindows int + windows []*hllWindow + currentWindow int + sampleRate float64 + rng *rand.Rand + logger log.Logger + mu sync.RWMutex + + // Cache for merged HLL results per query window to avoid re-merging unchanged windows + cache map[time.Duration]*mergedCacheEntry +} + +// mergedCacheEntry caches a merged HLL result for a specific query window duration. +type mergedCacheEntry struct { + mergedHLL *hyperloglog.Sketch + lastMergedEndTime time.Time // endTime of the latest window included in the merge +} + +// hllWindow represents a single time window with its HyperLogLog sketch. +type hllWindow struct { + hll *hyperloglog.Sketch + startTime time.Time + endTime time.Time +} + +// NewActiveQueriedSeries creates a new ActiveQueriedSeries tracker. +func NewActiveQueriedSeries(windowsToQuery []time.Duration, windowDuration time.Duration, sampleRate float64, logger log.Logger) *ActiveQueriedSeries { + // Determine the maximum window duration to track + // windowDurations are assumed to be validated (> 0 and non-empty) by Config.Validate + maxWindow := time.Duration(0) + for _, d := range windowsToQuery { + if d > maxWindow { + maxWindow = d + } + } + + // Calculate number of windows needed for the maximum duration + numWindows := int(maxWindow / windowDuration) + if numWindows <= 0 { + numWindows = 1 // At least 1 window + } + + windows := make([]*hllWindow, numWindows) + now := time.Now() + for i := range numWindows { + windows[i] = &hllWindow{ + hll: hyperloglog.New14(), + startTime: now.Add(time.Duration(i-numWindows) * windowDuration), + endTime: now.Add(time.Duration(i-numWindows+1) * windowDuration), + } + } + + return &ActiveQueriedSeries{ + windowDuration: windowDuration, + numWindows: numWindows, + windows: windows, + currentWindow: numWindows - 1, // Start with the most recent window + sampleRate: sampleRate, + rng: rand.New(rand.NewSource(time.Now().UnixNano())), + logger: logger, + cache: make(map[time.Duration]*mergedCacheEntry), + } +} + +// SampleRequest returns whether this request should be sampled based on sampling. +// This should be called before collecting hashes to avoid unnecessary work. +func (a *ActiveQueriedSeries) SampleRequest() bool { + if a.sampleRate >= 1.0 { + return true // 100% sampling, always track + } + return a.rng.Float64() <= a.sampleRate +} + +// UpdateSeriesBatch adds multiple series hashes to the current active window in a single batch. +// This is more efficient than calling UpdateSeries multiple times as it: +// - Only acquires the lock once +// - Only rotates windows once +// Note: This method should be called from the centralized worker goroutines. +// Sampling should be checked before calling this method. +func (a *ActiveQueriedSeries) UpdateSeriesBatch(hashes []uint64, now time.Time) { + if len(hashes) == 0 { + return + } + + a.mu.Lock() + defer a.mu.Unlock() + + // Rotate windows if needed + a.rotateWindowsLocked(now) + + // Add all hashes to current window + window := a.windows[a.currentWindow] + if now.After(window.startTime) && now.Before(window.endTime) || now.Equal(window.startTime) { + for _, hash := range hashes { + window.hll.InsertHash(hash) + } + } +} + +// GetSeriesQueried returns the estimated cardinality of active queried series +// by merging all non-expired windows within the specified time range. +// If queryWindow is 0, it uses the full tracking period. +// This method uses caching to efficiently merge only new windows when possible. +func (a *ActiveQueriedSeries) GetSeriesQueried(now time.Time, queryWindow time.Duration) (uint64, error) { + a.mu.Lock() + defer a.mu.Unlock() + + // Determine the cutoff time based on query window or full tracking period + var cutoffTime time.Time + if queryWindow > 0 { + cutoffTime = now.Add(-queryWindow) + } else { + cutoffTime = now.Add(-time.Duration(a.numWindows) * a.windowDuration) + } + + // Get the current window (which is actively being updated and should not be cached) + currentWindow := a.windows[a.currentWindow] + currentWindowEndTime := currentWindow.endTime + + // The latest completed window is the one before the current window in the circular buffer. + // This is guaranteed to be in range because query windows are validated to be larger than windowDuration. + prevWindowIndex := (a.currentWindow - 1 + a.numWindows) % a.numWindows + prevWindow := a.windows[prevWindowIndex] + + // Determine the latest completed window endTime + // The previous window should be completed and within the query range + // Note: We use >= for cutoffTime to handle the case where queryWindow equals windowDuration + var latestCompletedEndTime time.Time + if (prevWindow.endTime.After(cutoffTime) || prevWindow.endTime.Equal(cutoffTime)) && prevWindow.endTime.Before(now) { + latestCompletedEndTime = prevWindow.endTime + } + // If previous window is not in range (shouldn't happen with validation, but handle gracefully), + // latestCompletedEndTime will be zero, which will trigger a full merge + + // Check if we have a cached result for this query window + cached, hasCache := a.cache[queryWindow] + + // Check if cache is still valid (no new completed windows since last merge) + if hasCache && !latestCompletedEndTime.After(cached.lastMergedEndTime) { + // Cache is valid - merge cached result with current window + merged := cached.mergedHLL.Clone() + + // Always merge the current window (which is actively being updated) + if currentWindow.endTime.After(cutoffTime) && + (currentWindow.startTime.Before(now) || currentWindow.startTime.Equal(now)) { + if err := merged.Merge(currentWindow.hll); err != nil { + if a.logger != nil { + level.Error(a.logger).Log("msg", "failed to merge HyperLogLog sketches", "err", err) + } + return 0, fmt.Errorf("failed to merge HyperLogLog sketches: %w", err) + } + } + + return merged.Estimate(), nil + } + + // Cache needs update - incrementally merge only new completed windows if cache exists + if hasCache && latestCompletedEndTime.After(cached.lastMergedEndTime) { + // Clone the cached merged HLL + merged := cached.mergedHLL.Clone() + + // Find and merge only new completed windows (excluding current window) + for _, window := range a.windows { + // Include windows that: + // 1. End after the cutoff time + // 2. End before 'now' (completed windows only) + // 3. End after the last merged endTime (new completed windows) + // 4. Not the current window + if window.endTime.After(cutoffTime) && + window.endTime.Before(now) && + window.endTime.After(cached.lastMergedEndTime) && + window.endTime != currentWindowEndTime { + if err := merged.Merge(window.hll); err != nil { + if a.logger != nil { + level.Error(a.logger).Log("msg", "failed to merge HyperLogLog sketches", "err", err) + } + return 0, fmt.Errorf("failed to merge HyperLogLog sketches: %w", err) + } + } + } + + // Update cache with new merged result (only completed windows) + a.cache[queryWindow] = &mergedCacheEntry{ + mergedHLL: merged, + lastMergedEndTime: latestCompletedEndTime, + } + + // Always merge the current window before returning + if currentWindow.endTime.After(cutoffTime) && + (currentWindow.startTime.Before(now) || currentWindow.startTime.Equal(now)) { + if err := merged.Merge(currentWindow.hll); err != nil { + if a.logger != nil { + level.Error(a.logger).Log("msg", "failed to merge HyperLogLog sketches", "err", err) + } + return 0, fmt.Errorf("failed to merge HyperLogLog sketches: %w", err) + } + } + + return merged.Estimate(), nil + } + + // No cache or cache invalid - do full merge of completed windows only + var merged *hyperloglog.Sketch + activeWindows := 0 + var latestCompletedEndTimeForCache time.Time + + for _, window := range a.windows { + // Include only completed windows (endTime < now) that are in range + // Exclude the current window which is still being updated + if window.endTime.After(cutoffTime) && + window.endTime.Before(now) && + window.endTime != currentWindowEndTime { + // Window is within the query range and completed, merge it + if merged == nil { + // Clone the first window to avoid modifying it + merged = window.hll.Clone() + } else { + // Merge into the existing merged sketch + if err := merged.Merge(window.hll); err != nil { + // Log the error and return 0 with the error + if a.logger != nil { + level.Error(a.logger).Log("msg", "failed to merge HyperLogLog sketches", "err", err) + } + return 0, fmt.Errorf("failed to merge HyperLogLog sketches: %w", err) + } + } + activeWindows++ + // Track the latest completed endTime + if window.endTime.After(latestCompletedEndTimeForCache) { + latestCompletedEndTimeForCache = window.endTime + } + } + } + + // Cache the merged result (only completed windows) + if merged != nil { + a.cache[queryWindow] = &mergedCacheEntry{ + mergedHLL: merged, + lastMergedEndTime: latestCompletedEndTimeForCache, + } + } + + // Always merge the current window before returning + if currentWindow.endTime.After(cutoffTime) && + (currentWindow.startTime.Before(now) || currentWindow.startTime.Equal(now)) { + if merged == nil { + // No completed windows, start with current window + merged = currentWindow.hll.Clone() + } else { + // Merge current window into completed windows + if err := merged.Merge(currentWindow.hll); err != nil { + if a.logger != nil { + level.Error(a.logger).Log("msg", "failed to merge HyperLogLog sketches", "err", err) + } + return 0, fmt.Errorf("failed to merge HyperLogLog sketches: %w", err) + } + } + } + + if merged == nil { + return 0, nil + } + + return merged.Estimate(), nil +} + +// Purge rotates expired windows and clears them. +func (a *ActiveQueriedSeries) Purge(now time.Time) { + a.mu.Lock() + defer a.mu.Unlock() + + a.rotateWindowsLocked(now) +} + +// rotateWindowsLocked ensures we're using the correct window for the current time. +// Must be called with write lock held. +func (a *ActiveQueriedSeries) rotateWindowsLocked(now time.Time) { + currentWindow := a.windows[a.currentWindow] + + // Check if current window has expired + if now.After(currentWindow.endTime) || now.Equal(currentWindow.endTime) { + // Calculate how many windows we need to advance + timeSinceWindowStart := now.Sub(currentWindow.startTime) + windowsToAdvance := int(timeSinceWindowStart / a.windowDuration) + + if windowsToAdvance >= a.numWindows { + // All windows are expired, reset all + for i := range a.numWindows { + windowStart := now.Add(time.Duration(i-a.numWindows+1) * a.windowDuration) + a.windows[i] = &hllWindow{ + hll: hyperloglog.New14(), + startTime: windowStart, + endTime: windowStart.Add(a.windowDuration), + } + } + a.currentWindow = a.numWindows - 1 + // Invalidate cache since all windows were reset + a.cache = make(map[time.Duration]*mergedCacheEntry) + } else { + // Advance by the required number of windows + for i := range windowsToAdvance { + // Move to next window + a.currentWindow = (a.currentWindow + 1) % a.numWindows + + // Reset the window we're about to use + windowStart := now.Add(time.Duration(i-windowsToAdvance+1) * a.windowDuration) + a.windows[a.currentWindow] = &hllWindow{ + hll: hyperloglog.New14(), + startTime: windowStart, + endTime: windowStart.Add(a.windowDuration), + } + } + // Invalidate cache since windows were rotated + a.cache = make(map[time.Duration]*mergedCacheEntry) + } + } +} + +// clear resets all windows (used for testing and cleanup). +func (a *ActiveQueriedSeries) clear() { + a.mu.Lock() + defer a.mu.Unlock() + + now := time.Now() + for i := range a.numWindows { + windowStart := now.Add(time.Duration(i-a.numWindows+1) * a.windowDuration) + a.windows[i] = &hllWindow{ + hll: hyperloglog.New14(), + startTime: windowStart, + endTime: windowStart.Add(a.windowDuration), + } + } + a.currentWindow = a.numWindows - 1 + // Clear cache + a.cache = make(map[time.Duration]*mergedCacheEntry) +} + +// activeQueriedSeriesUpdate represents an update batch for a specific ActiveQueriedSeries instance. +type activeQueriedSeriesUpdate struct { + activeQueriedSeries *ActiveQueriedSeries + hashes []uint64 + now time.Time +} + +// ActiveQueriedSeriesService manages centralized worker goroutines for processing active queried series updates. +// It implements the services.Service interface to handle lifecycle management. +type ActiveQueriedSeriesService struct { + *services.BasicService + + updateChan chan activeQueriedSeriesUpdate + workers sync.WaitGroup + logger log.Logger + numWorkers int + droppedUpdatesTotal prometheus.Counter +} + +// NewActiveQueriedSeriesService creates a new ActiveQueriedSeriesService service. +func NewActiveQueriedSeriesService(logger log.Logger, registerer prometheus.Registerer) *ActiveQueriedSeriesService { + // Cap at 4 workers to avoid excessive goroutines + numWorkers := max(min(runtime.NumCPU()/2, 4), 1) + + m := &ActiveQueriedSeriesService{ + updateChan: make(chan activeQueriedSeriesUpdate, 10000), // Buffered channel to avoid blocking + logger: logger, + numWorkers: numWorkers, + droppedUpdatesTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "cortex_ingester_active_queried_series_updates_dropped_total", + Help: "The total number of active queried series updates that were dropped due to full channel.", + }), + } + + m.BasicService = services.NewBasicService(m.starting, m.running, m.stopping) + return m +} + +// starting initializes the worker goroutines. +func (m *ActiveQueriedSeriesService) starting(ctx context.Context) error { + // Start worker goroutines + for w := 0; w < m.numWorkers; w++ { + m.workers.Add(1) + go m.processUpdates(ctx) + } + level.Info(m.logger).Log("msg", "started active queried series worker goroutines", "workers", m.numWorkers) + return nil +} + +// running keeps the service running until context is canceled. +func (m *ActiveQueriedSeriesService) running(ctx context.Context) error { + // Wait for context to be canceled (service is stopping) + <-ctx.Done() + return nil +} + +// stopping waits for all worker goroutines to finish. +func (m *ActiveQueriedSeriesService) stopping(_ error) error { + // Close the channel to signal workers to stop + close(m.updateChan) + // Wait for all workers to finish + m.workers.Wait() + return nil +} + +// processUpdates is a worker goroutine that processes updates from the update channel. +func (m *ActiveQueriedSeriesService) processUpdates(ctx context.Context) { + defer m.workers.Done() + + for { + select { + case <-ctx.Done(): + return + case update, ok := <-m.updateChan: + if !ok { + // Channel closed, exit + return + } + // Process the update synchronously + update.activeQueriedSeries.UpdateSeriesBatch(update.hashes, update.now) + } + } +} + +// UpdateSeriesBatch sends an update to the update channel for processing. +// This method is non-blocking and will drop updates if the channel is full. +func (m *ActiveQueriedSeriesService) UpdateSeriesBatch(activeQueriedSeries *ActiveQueriedSeries, hashes []uint64, now time.Time, userID string) { + if len(hashes) == 0 { + return + } + + // Non-blocking send to centralized update channel + select { + case m.updateChan <- activeQueriedSeriesUpdate{ + activeQueriedSeries: activeQueriedSeries, + hashes: hashes, + now: now, + }: + // Successfully queued + default: + // Channel is full, drop the update to avoid blocking + // This is acceptable as we're using probabilistic data structures (HLL) + m.droppedUpdatesTotal.Inc() + level.Warn(m.logger).Log("msg", "active queried series update channel full, dropping batch", "batch_size", len(hashes), "user", userID) + } +} diff --git a/pkg/ingester/active_queried_series_test.go b/pkg/ingester/active_queried_series_test.go new file mode 100644 index 0000000000..7b0392f05b --- /dev/null +++ b/pkg/ingester/active_queried_series_test.go @@ -0,0 +1,455 @@ +package ingester + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/assert" +) + +func TestActiveQueriedSeries_UpdateSeries(t *testing.T) { + windowDuration := 1 * time.Minute + numWindows := 3 + totalDuration := time.Duration(numWindows) * windowDuration + a := NewActiveQueriedSeries([]time.Duration{totalDuration}, windowDuration, 1.0, log.NewNopLogger()) + + now := time.Now() + ls1 := labels.FromStrings("a", "1") + ls2 := labels.FromStrings("a", "2") + + // Initially should be 0 + count, err := a.GetSeriesQueried(now, 0) + assert.NoError(t, err) + assert.Equal(t, uint64(0), count) + + // Add first series + hash1 := ls1.Hash() + a.UpdateSeriesBatch([]uint64{hash1}, now) + active, err := a.GetSeriesQueried(now, 0) + assert.NoError(t, err) + assert.Greater(t, active, uint64(0), "Should have at least 1 series") + assert.LessOrEqual(t, active, uint64(2), "HyperLogLog estimate should be close to 1") + + // Add same series again (should still be ~1) + a.UpdateSeriesBatch([]uint64{hash1}, now) + active, err = a.GetSeriesQueried(now, 0) + assert.NoError(t, err) + assert.Greater(t, active, uint64(0)) + assert.LessOrEqual(t, active, uint64(2), "Duplicate series should not significantly increase count") + + // Add different series + hash2 := ls2.Hash() + a.UpdateSeriesBatch([]uint64{hash2}, now) + active, err = a.GetSeriesQueried(now, 0) + assert.NoError(t, err) + assert.GreaterOrEqual(t, active, uint64(1), "Should have at least 1 series") + assert.LessOrEqual(t, active, uint64(3), "HyperLogLog estimate should be close to 2") +} + +func TestActiveQueriedSeries_MultipleSeries(t *testing.T) { + windowDuration := 1 * time.Minute + numWindows := 3 + totalDuration := time.Duration(numWindows) * windowDuration + a := NewActiveQueriedSeries([]time.Duration{totalDuration}, windowDuration, 1.0, log.NewNopLogger()) + + now := time.Now() + numSeries := 100 + + // Add many different series + hashes := make([]uint64, 0, numSeries) + for i := range numSeries { + ls := labels.FromStrings("metric", "value", "index", fmt.Sprintf("%d", i)) + hash := ls.Hash() + hashes = append(hashes, hash) + } + a.UpdateSeriesBatch(hashes, now) + + active, err := a.GetSeriesQueried(now, 0) + assert.NoError(t, err) + // HyperLogLog with precision 14 has ~0.81% error, so we allow some variance + // For 100 series, estimate should be between ~90 and ~110 + assert.Greater(t, active, uint64(80), "Should estimate at least 80 series") + assert.Less(t, active, uint64(120), "Should estimate at most 120 series") +} + +func TestActiveQueriedSeries_WindowRotation(t *testing.T) { + windowDuration := 1 * time.Minute + numWindows := 3 + totalDuration := time.Duration(numWindows) * windowDuration + a := NewActiveQueriedSeries([]time.Duration{totalDuration}, windowDuration, 1.0, log.NewNopLogger()) + + baseTime := time.Now() + + // Add series in first window + ls1 := labels.FromStrings("a", "1") + hash1 := ls1.Hash() + a.UpdateSeriesBatch([]uint64{hash1}, baseTime) + count, err := a.GetSeriesQueried(baseTime, 0) + assert.NoError(t, err) + assert.Greater(t, count, uint64(0)) + + // Advance time by one window duration + time1 := baseTime.Add(windowDuration) + ls2 := labels.FromStrings("a", "2") + hash2 := ls2.Hash() + a.UpdateSeriesBatch([]uint64{hash2}, time1) + + // Both series should still be active (within tracking period) + active, err := a.GetSeriesQueried(time1, 0) + assert.NoError(t, err) + assert.GreaterOrEqual(t, active, uint64(1), "Should have at least 1 series") + assert.LessOrEqual(t, active, uint64(3), "Should have at most 3 series (allowing for HLL variance)") + + // Advance time beyond all windows + time2 := baseTime.Add(time.Duration(numWindows+1) * windowDuration) + a.Purge(time2) + + // All windows should be expired + active, err = a.GetSeriesQueried(time2, 0) + assert.NoError(t, err) + assert.Equal(t, uint64(0), active, "All windows should be expired") +} + +func TestActiveQueriedSeries_Purge(t *testing.T) { + windowDuration := 1 * time.Minute + numWindows := 3 + totalDuration := time.Duration(numWindows) * windowDuration + a := NewActiveQueriedSeries([]time.Duration{totalDuration}, windowDuration, 1.0, log.NewNopLogger()) + + baseTime := time.Now() + + // Add series in first window + ls1 := labels.FromStrings("a", "1") + hash1 := ls1.Hash() + a.UpdateSeriesBatch([]uint64{hash1}, baseTime) + + // Add series in second window + time1 := baseTime.Add(windowDuration) + ls2 := labels.FromStrings("a", "2") + hash2 := ls2.Hash() + a.UpdateSeriesBatch([]uint64{hash2}, time1) + + // Both should be active + active, err := a.GetSeriesQueried(time1, 0) + assert.NoError(t, err) + assert.Greater(t, active, uint64(0)) + + // Purge at time that expires first window but not second + time2 := baseTime.Add(time.Duration(numWindows) * windowDuration) + a.Purge(time2) + + // First window should be expired, but second should still be active + active, err = a.GetSeriesQueried(time2, 0) + assert.NoError(t, err) + // The estimate might be 0 or 1 depending on which windows are still active + assert.GreaterOrEqual(t, active, uint64(0)) +} + +func TestActiveQueriedSeries_TimeExpiration(t *testing.T) { + windowDuration := 1 * time.Minute + numWindows := 2 + totalDuration := time.Duration(numWindows) * windowDuration + a := NewActiveQueriedSeries([]time.Duration{totalDuration}, windowDuration, 1.0, log.NewNopLogger()) + + baseTime := time.Now() + + // Add series at base time + ls1 := labels.FromStrings("a", "1") + hash1 := ls1.Hash() + a.UpdateSeriesBatch([]uint64{hash1}, baseTime) + + // Add series one window later + time1 := baseTime.Add(windowDuration) + ls2 := labels.FromStrings("a", "2") + hash2 := ls2.Hash() + a.UpdateSeriesBatch([]uint64{hash2}, time1) + + // Both should be active + active, err := a.GetSeriesQueried(time1, 0) + assert.NoError(t, err) + assert.Greater(t, active, uint64(0)) + + // Advance time beyond tracking period + time2 := baseTime.Add(time.Duration(numWindows+1) * windowDuration) + a.Purge(time2) + + // All should be expired + active, err = a.GetSeriesQueried(time2, 0) + assert.NoError(t, err) + assert.Equal(t, uint64(0), active, "All series should be expired") +} + +func TestActiveQueriedSeries_DefaultValues(t *testing.T) { + // Test with valid minimal values + // Note: Invalid values (zero/negative) are now validated at the Config level, + // so this test verifies that the function works with valid minimal inputs. + windowDuration := 1 * time.Minute + a1 := NewActiveQueriedSeries([]time.Duration{windowDuration}, windowDuration, 1.0, log.NewNopLogger()) + assert.NotNil(t, a1) + count, err := a1.GetSeriesQueried(time.Now(), 0) + assert.NoError(t, err) + assert.Equal(t, uint64(0), count) + + // Test with minimum sample rate + a2 := NewActiveQueriedSeries([]time.Duration{windowDuration}, windowDuration, 0.1, log.NewNopLogger()) + assert.NotNil(t, a2) + count, err = a2.GetSeriesQueried(time.Now(), 0) + assert.NoError(t, err) + assert.Equal(t, uint64(0), count) +} + +func TestActiveQueriedSeries_ConcurrentUpdates(t *testing.T) { + windowDuration := 1 * time.Minute + numWindows := 10 + totalDuration := time.Duration(numWindows) * windowDuration + a := NewActiveQueriedSeries([]time.Duration{totalDuration}, windowDuration, 1.0, log.NewNopLogger()) + + now := time.Now() + numGoroutines := 50 + numSeriesPerGoroutine := 10 + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for i := range numGoroutines { + go func(goroutineID int) { + defer wg.Done() + for j := range numSeriesPerGoroutine { + ls := labels.FromStrings("metric", "value", "goroutine", fmt.Sprintf("%d", goroutineID), "series", fmt.Sprintf("%d", j)) + hash := ls.Hash() + a.UpdateSeriesBatch([]uint64{hash}, now) + } + }(i) + } + + wg.Wait() + + // Check that we got a reasonable estimate + active, err := a.GetSeriesQueried(now, 0) + assert.NoError(t, err) + expectedMin := uint64(numGoroutines * numSeriesPerGoroutine / 2) // Allow for 50% variance + expectedMax := uint64(numGoroutines * numSeriesPerGoroutine * 2) // Allow for 2x variance + assert.Greater(t, active, expectedMin, "Concurrent updates should be tracked") + assert.Less(t, active, expectedMax, "Estimate should not be too high") +} + +func TestActiveQueriedSeries_Clear(t *testing.T) { + windowDuration := 1 * time.Minute + numWindows := 3 + totalDuration := time.Duration(numWindows) * windowDuration + a := NewActiveQueriedSeries([]time.Duration{totalDuration}, windowDuration, 1.0, log.NewNopLogger()) + + now := time.Now() + + // Add some series + ls1 := labels.FromStrings("a", "1") + hash1 := ls1.Hash() + a.UpdateSeriesBatch([]uint64{hash1}, now) + + ls2 := labels.FromStrings("a", "2") + hash2 := ls2.Hash() + a.UpdateSeriesBatch([]uint64{hash2}, now) + + count, err := a.GetSeriesQueried(now, 0) + assert.NoError(t, err) + assert.Greater(t, count, uint64(0)) + + // Clear and verify + a.clear() + count, err = a.GetSeriesQueried(now, 0) + assert.NoError(t, err) + assert.Equal(t, uint64(0), count, "After clear, should have 0 active series") +} + +func TestActiveQueriedSeries_EmptyWindows(t *testing.T) { + windowDuration := 1 * time.Minute + numWindows := 3 + totalDuration := time.Duration(numWindows) * windowDuration + a := NewActiveQueriedSeries([]time.Duration{totalDuration}, windowDuration, 1.0, log.NewNopLogger()) + + now := time.Now() + + // Should return 0 for empty tracker + count, err := a.GetSeriesQueried(now, 0) + assert.NoError(t, err) + assert.Equal(t, uint64(0), count) +} + +func TestActiveQueriedSeries_WindowBoundaries(t *testing.T) { + windowDuration := 1 * time.Minute + numWindows := 2 + totalDuration := time.Duration(numWindows) * windowDuration + a := NewActiveQueriedSeries([]time.Duration{totalDuration}, windowDuration, 1.0, log.NewNopLogger()) + + baseTime := time.Now() + + // Add series exactly at window start + ls1 := labels.FromStrings("a", "1") + hash1 := ls1.Hash() + a.UpdateSeriesBatch([]uint64{hash1}, baseTime) + + // Add series exactly at window end (should be in next window) + time1 := baseTime.Add(windowDuration) + ls2 := labels.FromStrings("a", "2") + hash2 := ls2.Hash() + a.UpdateSeriesBatch([]uint64{hash2}, time1) + + // Both should be tracked + active, err := a.GetSeriesQueried(time1, 0) + assert.NoError(t, err) + assert.Greater(t, active, uint64(0)) +} + +func TestActiveQueriedSeries_ManyWindows(t *testing.T) { + windowDuration := 1 * time.Minute + numWindows := 10 + totalDuration := time.Duration(numWindows) * windowDuration + a := NewActiveQueriedSeries([]time.Duration{totalDuration}, windowDuration, 1.0, log.NewNopLogger()) + + baseTime := time.Now() + + // Add series across multiple windows + for i := range numWindows { + ls := labels.FromStrings("metric", "value", "window", fmt.Sprintf("%d", i)) + hash := ls.Hash() + windowTime := baseTime.Add(time.Duration(i) * windowDuration) + a.UpdateSeriesBatch([]uint64{hash}, windowTime) + } + + // All should be active + lastTime := baseTime.Add(time.Duration(numWindows-1) * windowDuration) + active, err := a.GetSeriesQueried(lastTime, 0) + assert.NoError(t, err) + assert.GreaterOrEqual(t, active, uint64(1), "Should have at least some series active") +} + +func TestActiveQueriedSeries_Accuracy(t *testing.T) { + windowDuration := 1 * time.Minute + numWindows := 3 + totalDuration := time.Duration(numWindows) * windowDuration + a := NewActiveQueriedSeries([]time.Duration{totalDuration}, windowDuration, 1.0, log.NewNopLogger()) + + // Test with known number of unique series + testCases := []struct { + name string + numSeries int + expectedMin uint64 + expectedMax uint64 + }{ + {"10 series", 10, 8, 15}, + {"50 series", 50, 40, 65}, + {"100 series", 100, 85, 120}, + {"500 series", 500, 450, 550}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + a.clear() + // Use a fresh timestamp after clearing to ensure it's within the new window range + testNow := time.Now() + for i := 0; i < tc.numSeries; i++ { + ls := labels.FromStrings("metric", "value", "id", fmt.Sprintf("%d", i)) + hash := ls.Hash() + a.UpdateSeriesBatch([]uint64{hash}, testNow) + } + + active, err := a.GetSeriesQueried(testNow, 0) + assert.NoError(t, err) + assert.GreaterOrEqual(t, active, tc.expectedMin, + "Estimate should be at least %d for %d series", tc.expectedMin, tc.numSeries) + assert.LessOrEqual(t, active, tc.expectedMax, + "Estimate should be at most %d for %d series", tc.expectedMax, tc.numSeries) + }) + } +} + +func TestActiveQueriedSeries_RapidRotation(t *testing.T) { + windowDuration := 100 * time.Millisecond + numWindows := 3 + totalDuration := time.Duration(numWindows) * windowDuration + a := NewActiveQueriedSeries([]time.Duration{totalDuration}, windowDuration, 1.0, log.NewNopLogger()) + + baseTime := time.Now() + + // Add series in first window + ls1 := labels.FromStrings("a", "1") + hash1 := ls1.Hash() + a.UpdateSeriesBatch([]uint64{hash1}, baseTime) + + // Rapidly advance through windows + for i := 1; i <= numWindows+1; i++ { + windowTime := baseTime.Add(time.Duration(i) * windowDuration) + ls := labels.FromStrings("a", fmt.Sprintf("%d", i)) + hash := ls.Hash() + a.UpdateSeriesBatch([]uint64{hash}, windowTime) + a.Purge(windowTime) + } + + // After rapid rotation, only recent windows should have data + finalTime := baseTime.Add(time.Duration(numWindows+1) * windowDuration) + active, err := a.GetSeriesQueried(finalTime, 0) + assert.NoError(t, err) + // Should have at least the most recent series + assert.GreaterOrEqual(t, active, uint64(0)) +} + +func TestActiveQueriedSeries_MergeWindows(t *testing.T) { + windowDuration := 1 * time.Minute + numWindows := 3 + totalDuration := time.Duration(numWindows) * windowDuration + a := NewActiveQueriedSeries([]time.Duration{totalDuration}, windowDuration, 1.0, log.NewNopLogger()) + + baseTime := time.Now() + + // Add different series to different windows + series1 := labels.FromStrings("metric", "value1") + hash1 := series1.Hash() + a.UpdateSeriesBatch([]uint64{hash1}, baseTime) + + series2 := labels.FromStrings("metric", "value2") + hash2 := series2.Hash() + time1 := baseTime.Add(windowDuration) + a.UpdateSeriesBatch([]uint64{hash2}, time1) + + series3 := labels.FromStrings("metric", "value3") + hash3 := series3.Hash() + time2 := baseTime.Add(2 * windowDuration) + a.UpdateSeriesBatch([]uint64{hash3}, time2) + + // All three should be merged and counted + active, err := a.GetSeriesQueried(time2, 0) + assert.NoError(t, err) + // HyperLogLog should estimate around 3, allowing for variance + assert.GreaterOrEqual(t, active, uint64(2), "Should estimate at least 2 unique series") + assert.LessOrEqual(t, active, uint64(5), "Should estimate at most 5 unique series") +} + +func TestActiveQueriedSeries_EdgeCaseTimes(t *testing.T) { + windowDuration := 1 * time.Minute + numWindows := 3 + totalDuration := time.Duration(numWindows) * windowDuration + a := NewActiveQueriedSeries([]time.Duration{totalDuration}, windowDuration, 1.0, log.NewNopLogger()) + + baseTime := time.Now() + + // Test with time exactly at window boundary + ls := labels.FromStrings("a", "1") + hash := ls.Hash() + a.UpdateSeriesBatch([]uint64{hash}, baseTime) + + // Query at exact boundary + active, err := a.GetSeriesQueried(baseTime, 0) + assert.NoError(t, err) + assert.GreaterOrEqual(t, active, uint64(0)) + + // Test with time before base + pastTime := baseTime.Add(-1 * time.Hour) + active, err = a.GetSeriesQueried(pastTime, 0) + assert.NoError(t, err) + // Should handle gracefully + assert.GreaterOrEqual(t, active, uint64(0)) +} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index b3ffdc22f3..acb8bac27a 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -124,6 +124,12 @@ type Config struct { ActiveSeriesMetricsUpdatePeriod time.Duration `yaml:"active_series_metrics_update_period"` ActiveSeriesMetricsIdleTimeout time.Duration `yaml:"active_series_metrics_idle_timeout"` + ActiveQueriedSeriesMetricsEnabled bool `yaml:"active_queried_series_metrics_enabled"` + ActiveQueriedSeriesMetricsUpdatePeriod time.Duration `yaml:"active_queried_series_metrics_update_period"` + ActiveQueriedSeriesMetricsWindowDuration time.Duration `yaml:"active_queried_series_metrics_window_duration"` + ActiveQueriedSeriesMetricsSampleRate float64 `yaml:"active_queried_series_metrics_sample_rate"` + ActiveQueriedSeriesMetricsWindows cortex_tsdb.DurationList `yaml:"active_queried_series_metrics_windows"` + // Use blocks storage. BlocksStorageConfig cortex_tsdb.BlocksStorageConfig `yaml:"-"` @@ -181,6 +187,13 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.ActiveSeriesMetricsUpdatePeriod, "ingester.active-series-metrics-update-period", 1*time.Minute, "How often to update active series metrics.") f.DurationVar(&cfg.ActiveSeriesMetricsIdleTimeout, "ingester.active-series-metrics-idle-timeout", 10*time.Minute, "After what time a series is considered to be inactive.") + f.BoolVar(&cfg.ActiveQueriedSeriesMetricsEnabled, "ingester.active-queried-series-metrics-enabled", false, "Enable tracking of active queried series using probabilistic data structure and export them as metrics.") + f.DurationVar(&cfg.ActiveQueriedSeriesMetricsUpdatePeriod, "ingester.active-queried-series-metrics-update-period", 1*time.Minute, "How often to update active queried series metrics.") + f.DurationVar(&cfg.ActiveQueriedSeriesMetricsWindowDuration, "ingester.active-queried-series-metrics-window-duration", 15*time.Minute, "Duration of each sub-window for active queried series tracking (e.g., 1 minute). Used to divide the total tracking period into smaller windows.") + f.Float64Var(&cfg.ActiveQueriedSeriesMetricsSampleRate, "ingester.active-queried-series-metrics-sample-rate", 1.0, "Sampling rate for active queried series tracking (1.0 = 100% sampling, 0.1 = 10% sampling). By default, all queries are sampled.") + cfg.ActiveQueriedSeriesMetricsWindows = cortex_tsdb.DurationList{2 * time.Hour} + f.Var(&cfg.ActiveQueriedSeriesMetricsWindows, "ingester.active-queried-series-metrics-windows", "Time windows to expose queried series metric. Each window tracks queried series within that time period.") + f.BoolVar(&cfg.UploadCompactedBlocksEnabled, "ingester.upload-compacted-blocks-enabled", true, "Enable uploading compacted blocks.") f.StringVar(&cfg.IgnoreSeriesLimitForMetricNames, "ingester.ignore-series-limit-for-metric-names", "", "Comma-separated list of metric names, for which -ingester.max-series-per-metric and -ingester.max-global-series-per-metric limits will be ignored. Does not affect max-series-per-user or max-global-series-per-metric limits.") f.StringVar(&cfg.AdminLimitMessage, "ingester.admin-limit-message", "please contact administrator to raise it", "Customize the message contained in limit errors") @@ -208,6 +221,37 @@ func (cfg *Config) Validate(monitoredResources flagext.StringSliceCSV) error { return err } + // Validate active queried series metrics windows + if cfg.ActiveQueriedSeriesMetricsEnabled { + if len(cfg.ActiveQueriedSeriesMetricsWindows) == 0 { + return fmt.Errorf("active queried series metrics windows must be configured when enabled") + } + + // Validate window duration + if cfg.ActiveQueriedSeriesMetricsWindowDuration <= 0 { + return fmt.Errorf("active queried series metrics sub-window duration must be > 0, got %v", cfg.ActiveQueriedSeriesMetricsWindowDuration) + } + + // Validate sample rate + if cfg.ActiveQueriedSeriesMetricsSampleRate <= 0 { + return fmt.Errorf("active queried series metrics sample rate must be > 0, got %v", cfg.ActiveQueriedSeriesMetricsSampleRate) + } + if cfg.ActiveQueriedSeriesMetricsSampleRate > 1.0 { + return fmt.Errorf("active queried series metrics sample rate must be <= 1.0, got %v", cfg.ActiveQueriedSeriesMetricsSampleRate) + } + + for _, window := range cfg.ActiveQueriedSeriesMetricsWindows { + if window <= 0 { + return fmt.Errorf("active queried series metrics window duration must be > 0, got %v", window) + } + + // Query window duration must be at least as large as the window duration. + if window < cfg.ActiveQueriedSeriesMetricsWindowDuration { + return fmt.Errorf("active queried series metrics window duration (%v) must be at least as large as sub-window duration (%v)", window, cfg.ActiveQueriedSeriesMetricsWindowDuration) + } + } + } + return nil } @@ -270,6 +314,8 @@ type Ingester struct { matchersCache storecache.MatchersCache expandedPostingsCacheFactory *cortex_tsdb.ExpandedPostingsCacheFactory + + activeQueriedSeriesService *ActiveQueriedSeriesService } // Shipper interface is used to have an easy way to mock it in tests. @@ -309,12 +355,13 @@ func (r tsdbCloseCheckResult) shouldClose() bool { } type userTSDB struct { - db *tsdb.DB - userID string - activeSeries *ActiveSeries - seriesInMetric *metricCounter - labelSetCounter *labelSetCounter - limiter *Limiter + db *tsdb.DB + userID string + activeSeries *ActiveSeries + activeQueriedSeries *ActiveQueriedSeries + seriesInMetric *metricCounter + labelSetCounter *labelSetCounter + limiter *Limiter instanceSeriesCount *atomic.Int64 // Shared across all userTSDB instances created by ingester. instanceLimitsFn func() *InstanceLimits @@ -743,6 +790,10 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe matchersCache: storecache.NoopMatchersCache, } + if cfg.ActiveQueriedSeriesMetricsEnabled { + i.activeQueriedSeriesService = NewActiveQueriedSeriesService(logger, registerer) + } + if cfg.MatchersCacheMaxItems > 0 { r := prometheus.NewRegistry() registerer.MustRegister(cortex_tsdb.NewMatchCacheMetrics("cortex_ingester", r, logger)) @@ -755,6 +806,7 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe i.metrics = newIngesterMetrics(registerer, false, cfg.ActiveSeriesMetricsEnabled, + cfg.ActiveQueriedSeriesMetricsEnabled, i.getInstanceLimits, i.ingestionRate, &i.maxInflightPushRequests, @@ -849,6 +901,7 @@ func NewForFlusher(cfg Config, limits *validation.Overrides, registerer promethe cfg.AdminLimitMessage, ) i.metrics = newIngesterMetrics(registerer, + false, false, false, i.getInstanceLimits, @@ -899,6 +952,11 @@ func (i *Ingester) starting(ctx context.Context) error { // let's start the rest of subservices via manager servs := []services.Service(nil) + // Start active queried series service if enabled + if i.activeQueriedSeriesService != nil { + servs = append(servs, i.activeQueriedSeriesService) + } + compactionService := services.NewBasicService(nil, i.compactionLoop, nil) servs = append(servs, compactionService) @@ -983,6 +1041,13 @@ func (i *Ingester) updateLoop(ctx context.Context) error { defer t.Stop() } + var activeQueriedSeriesTickerChan <-chan time.Time + if i.cfg.ActiveQueriedSeriesMetricsEnabled { + t := time.NewTicker(i.cfg.ActiveQueriedSeriesMetricsUpdatePeriod) + activeQueriedSeriesTickerChan = t.C + defer t.Stop() + } + // Similarly to the above, this is a hardcoded value. metadataPurgeTicker := time.NewTicker(metadataPurgePeriod) defer metadataPurgeTicker.Stop() @@ -1009,6 +1074,8 @@ func (i *Ingester) updateLoop(ctx context.Context) error { case <-activeSeriesTickerChan: i.updateActiveSeries(ctx) + case <-activeQueriedSeriesTickerChan: + i.updateActiveQueriedSeries(ctx) case <-maxTrackerResetTicker.C: i.maxInflightQueryRequests.Tick() i.maxInflightPushRequests.Tick() @@ -1081,6 +1148,31 @@ func (i *Ingester) updateActiveSeries(ctx context.Context) { } } +func (i *Ingester) updateActiveQueriedSeries(ctx context.Context) { + now := time.Now() + for _, userID := range i.getTSDBUsers() { + userDB, err := i.getTSDB(userID) + if err != nil || userDB == nil || userDB.activeQueriedSeries == nil { + continue + } + + // Purge expired windows for all trackers + userDB.activeQueriedSeries.Purge(now) + + // Get estimated cardinality for each configured window + for _, windowDuration := range i.cfg.ActiveQueriedSeriesMetricsWindows { + estimatedCount, err := userDB.activeQueriedSeries.GetSeriesQueried(now, windowDuration) + if err != nil { + level.Error(logutil.WithContext(ctx, i.logger)).Log("msg", "failed to get active queried series count", "user", userID, "window", windowDuration, "err", err) + continue + } + + // Update metric with window label + i.metrics.activeQueriedSeriesPerUser.WithLabelValues(userID, windowDuration.String()).Set(float64(estimatedCount)) + } + } +} + func (i *Ingester) updateLabelSetMetrics() { activeUserSet := make(map[string]map[uint64]struct{}) for _, userID := range i.getTSDBUsers() { @@ -2316,6 +2408,18 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th defer putTimeSeriesChunksSlice(chunkSeries) batchSizeBytes := 0 var it chunks.Iterator + + now := time.Now() + // Check sampling decision early to avoid calculating hashes if batch will be skipped + var queriedSeriesHashes []uint64 + sampled := false + if db.activeQueriedSeries != nil { + sampled = db.activeQueriedSeries.SampleRequest() + if sampled { + queriedSeriesHashes = make([]uint64, 0, 1024) // Pre-allocate with reasonable capacity + } + } + for ss.Next() { series := ss.At() lbls := series.Labels() @@ -2328,6 +2432,12 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th continue } + // Collect hash for batched tracking (only if sampling decision allows) + if sampled { + hash := lbls.Hash() + queriedSeriesHashes = append(queriedSeriesHashes, hash) + } + // convert labels to LabelAdapter ts := client.TimeSeriesChunk{ Labels: cortexpb.FromLabelsToLabelAdapters(lbls), @@ -2392,6 +2502,11 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th return 0, 0, 0, 0, err } + // Update active queried series tracking in a single batched call + if sampled && len(queriedSeriesHashes) > 0 && db.activeQueriedSeries != nil && i.activeQueriedSeriesService != nil { + i.activeQueriedSeriesService.UpdateSeriesBatch(db.activeQueriedSeries, queriedSeriesHashes, now, db.userID) + } + // Final flush any existing metrics if batchSizeBytes != 0 { err = client.SendQueryStream(stream, &client.QueryStreamResponse{ @@ -2512,9 +2627,20 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { postingCache = i.expandedPostingsCacheFactory.NewExpandedPostingsCache(userID, i.metrics.expandedPostingsCacheMetrics) } + var activeQueriedSeries *ActiveQueriedSeries + if i.cfg.ActiveQueriedSeriesMetricsEnabled { + activeQueriedSeries = NewActiveQueriedSeries( + i.cfg.ActiveQueriedSeriesMetricsWindows, + i.cfg.ActiveQueriedSeriesMetricsWindowDuration, + i.cfg.ActiveQueriedSeriesMetricsSampleRate, + i.logger, + ) + } + userDB := &userTSDB{ userID: userID, activeSeries: NewActiveSeries(), + activeQueriedSeries: activeQueriedSeries, seriesInMetric: newMetricCounter(i.limiter, i.cfg.getIgnoreSeriesLimitForMetricNamesMap()), labelSetCounter: newLabelSetCounter(i.limiter), ingestedAPISamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod), diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 2246f07402..10ea68982b 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -7305,3 +7305,172 @@ func (*panickingMatchersCache) GetOrSet(_ storecache.ConversionLabelMatcher, _ s a[1] = 2 // index out of range return nil, nil } + +func TestIngester_ActiveQueriedSeries(t *testing.T) { + registry := prometheus.NewRegistry() + + // Create ingester config with active queried series enabled + cfg := defaultIngesterTestConfig(t) + cfg.LifecyclerConfig.JoinAfter = 0 + cfg.ActiveQueriedSeriesMetricsEnabled = true + cfg.ActiveQueriedSeriesMetricsUpdatePeriod = 5 * time.Second + cfg.ActiveQueriedSeriesMetricsWindowDuration = 5 * time.Second + cfg.ActiveQueriedSeriesMetricsSampleRate = 1.0 // Sample all queries + cfg.ActiveQueriedSeriesMetricsWindows = cortex_tsdb.DurationList{5 * time.Second, 10 * time.Second} + + // Create ingester + i, err := prepareIngesterWithBlocksStorage(t, cfg, registry) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + ctx := user.InjectOrgID(context.Background(), "test-user") + + // Wait until the ingester is ACTIVE + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any { + return i.lifecycler.GetState() + }) + + // Push some sample data + now := time.Now() + for idx := range 10 { + req := &cortexpb.WriteRequest{} + for seriesIdx := range 5 { + req.Timeseries = append(req.Timeseries, cortexpb.PreallocTimeseries{ + TimeSeries: &cortexpb.TimeSeries{ + Labels: []cortexpb.LabelAdapter{ + {Name: labels.MetricName, Value: "test_metric"}, + {Name: "series", Value: fmt.Sprintf("series_%d", seriesIdx)}, + }, + Samples: []cortexpb.Sample{ + {Value: float64(idx), TimestampMs: now.Add(time.Duration(idx) * time.Second).UnixMilli()}, + }, + }, + }) + } + _, err := i.Push(ctx, req) + require.NoError(t, err) + } + + // Verify initial state - no queries run yet, so metric should be 0 or not exist + metricsBefore := fetchMetrics(t, registry, "cortex_ingester_active_queried_series") + t.Logf("Metrics before query: %v", metricsBefore) + + // Run a query to trigger active queried series tracking + matcher := &client.LabelMatcher{ + Type: client.REGEX_MATCH, + Name: labels.MetricName, + Value: ".*", + } + + req := &client.QueryRequest{ + StartTimestampMs: now.Add(-1 * time.Hour).UnixMilli(), + EndTimestampMs: now.Add(1 * time.Hour).UnixMilli(), + Matchers: []*client.LabelMatcher{matcher}, + } + + s := &mockQueryStreamServer{ctx: ctx} + err = i.QueryStream(req, s) + require.NoError(t, err) + require.NotEmpty(t, s.series, "Query should return some series") + t.Logf("Query returned %d series", len(s.series)) + + // Wait a bit for the async updates to be processed by the worker goroutines + time.Sleep(100 * time.Millisecond) + + // Manually trigger the update of active queried series metrics + // This simulates the periodic update that would normally happen + i.updateActiveQueriedSeries(context.Background()) + + // Check that the metric was updated + metricsAfter := fetchMetrics(t, registry, "cortex_ingester_active_queried_series") + t.Logf("Metrics after query: %v", metricsAfter) + + // Verify the metric exists and has a reasonable value + found := false + for _, metric := range metricsAfter { + if strings.Contains(metric, "cortex_ingester_active_queried_series") && + strings.Contains(metric, `user="test-user"`) { + found = true + t.Logf("Found active queried series metric: %s", metric) + + // Extract the value from the metric line + // Format: cortex_ingester_active_queried_series{user="test-user",window="5s"} VALUE + parts := strings.Fields(metric) + if len(parts) >= 2 { + value := parts[len(parts)-1] + t.Logf("Metric value: %s", value) + // Value should be greater than 0 since we queried series + assert.NotEqual(t, "0", value, "Active queried series should be greater than 0 after running a query") + } + } + } + assert.True(t, found, "Should find cortex_ingester_active_queried_series metric for test-user") + + // Run another query with a more specific matcher to verify tracking + specificMatcher := &client.LabelMatcher{ + Type: client.EQUAL, + Name: "series", + Value: "series_0", + } + req2 := &client.QueryRequest{ + StartTimestampMs: now.Add(-1 * time.Hour).UnixMilli(), + EndTimestampMs: now.Add(1 * time.Hour).UnixMilli(), + Matchers: []*client.LabelMatcher{matcher, specificMatcher}, + } + + s2 := &mockQueryStreamServer{ctx: ctx} + err = i.QueryStream(req2, s2) + require.NoError(t, err) + t.Logf("Second query returned %d series", len(s2.series)) + + // Wait a bit for the async updates to be processed + time.Sleep(100 * time.Millisecond) + + // Update metrics again + i.updateActiveQueriedSeries(context.Background()) + + // Verify metrics are still present + finalMetrics := fetchMetrics(t, registry, "cortex_ingester_active_queried_series") + t.Logf("Final metrics: %v", finalMetrics) + + foundFinal := false + for _, metric := range finalMetrics { + if strings.Contains(metric, "cortex_ingester_active_queried_series") && + strings.Contains(metric, `user="test-user"`) { + foundFinal = true + break + } + } + assert.True(t, foundFinal, "Active queried series metric should still be present after second query") +} + +// fetchMetrics is a helper function to fetch metrics from a registry that match a prefix +func fetchMetrics(t *testing.T, registry *prometheus.Registry, prefix string) []string { + metricFamilies, err := registry.Gather() + require.NoError(t, err) + + var metrics []string + for _, mf := range metricFamilies { + if strings.HasPrefix(mf.GetName(), prefix) { + for _, m := range mf.GetMetric() { + var labels []string + for _, label := range m.GetLabel() { + labels = append(labels, fmt.Sprintf(`%s="%s"`, label.GetName(), label.GetValue())) + } + labelStr := strings.Join(labels, ",") + + var value string + if m.Counter != nil { + value = fmt.Sprintf("%v", m.Counter.GetValue()) + } else if m.Gauge != nil { + value = fmt.Sprintf("%v", m.Gauge.GetValue()) + } + + metricLine := fmt.Sprintf("%s{%s} %s", mf.GetName(), labelStr, value) + metrics = append(metrics, metricLine) + } + } + } + return metrics +} diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index f74d2295f8..30bac26f59 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -53,10 +53,11 @@ type ingesterMetrics struct { memMetadataRemovedTotal *prometheus.CounterVec pushErrorsTotal *prometheus.CounterVec - activeSeriesPerUser *prometheus.GaugeVec - activeNHSeriesPerUser *prometheus.GaugeVec - limitsPerLabelSet *prometheus.GaugeVec - usagePerLabelSet *prometheus.GaugeVec + activeSeriesPerUser *prometheus.GaugeVec + activeNHSeriesPerUser *prometheus.GaugeVec + activeQueriedSeriesPerUser *prometheus.GaugeVec + limitsPerLabelSet *prometheus.GaugeVec + usagePerLabelSet *prometheus.GaugeVec // Global limit metrics maxUsersGauge prometheus.GaugeFunc @@ -76,6 +77,7 @@ type ingesterMetrics struct { func newIngesterMetrics(r prometheus.Registerer, createMetricsConflictingWithTSDB bool, activeSeriesEnabled bool, + activeQueriedSeriesEnabled bool, instanceLimitsFn func() *InstanceLimits, ingestionRate *util_math.EwmaRate, inflightPushRequests *util_math.MaxTracker, @@ -273,6 +275,12 @@ func newIngesterMetrics(r prometheus.Registerer, Name: "cortex_ingester_active_native_histogram_series", Help: "Number of currently active native histogram series per user.", }, []string{"user"}), + + // Not registered automatically, but only if activeQueriedSeriesEnabled is true. + activeQueriedSeriesPerUser: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_ingester_active_queried_series", + Help: "Estimated number of currently active queried series per user (probabilistic count using HyperLogLog).", + }, []string{"user", "window"}), } if postingsCacheEnabled && r != nil { @@ -284,6 +292,10 @@ func newIngesterMetrics(r prometheus.Registerer, r.MustRegister(m.activeNHSeriesPerUser) } + if activeQueriedSeriesEnabled && r != nil { + r.MustRegister(m.activeQueriedSeriesPerUser) + } + if createMetricsConflictingWithTSDB { m.memSeriesCreatedTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Name: memSeriesCreatedTotalName, @@ -305,6 +317,7 @@ func (m *ingesterMetrics) deletePerUserMetrics(userID string) { m.memMetadataRemovedTotal.DeleteLabelValues(userID) m.activeSeriesPerUser.DeleteLabelValues(userID) m.activeNHSeriesPerUser.DeleteLabelValues(userID) + m.activeQueriedSeriesPerUser.DeletePartialMatch(prometheus.Labels{"user": userID}) m.usagePerLabelSet.DeletePartialMatch(prometheus.Labels{"user": userID}) m.limitsPerLabelSet.DeletePartialMatch(prometheus.Labels{"user": userID}) m.pushErrorsTotal.DeletePartialMatch(prometheus.Labels{"user": userID}) diff --git a/pkg/ingester/metrics_test.go b/pkg/ingester/metrics_test.go index 40f84f3f97..fcd0812ae0 100644 --- a/pkg/ingester/metrics_test.go +++ b/pkg/ingester/metrics_test.go @@ -23,6 +23,7 @@ func TestIngesterMetrics(t *testing.T) { m := newIngesterMetrics(mainReg, false, true, + false, func() *InstanceLimits { return &InstanceLimits{ MaxIngestionRate: 12, diff --git a/pkg/ingester/user_metrics_metadata_test.go b/pkg/ingester/user_metrics_metadata_test.go index e07d174f41..d42bbe1d88 100644 --- a/pkg/ingester/user_metrics_metadata_test.go +++ b/pkg/ingester/user_metrics_metadata_test.go @@ -23,6 +23,7 @@ func Test_UserMetricsMetadata(t *testing.T) { maxInflightQueryRequests := util_math.MaxTracker{} m := newIngesterMetrics(reg, + false, false, false, func() *InstanceLimits { diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index fbdca1b6fa..3cdc629025 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -4479,6 +4479,41 @@ "ingester_config": { "description": "The ingester_config configures the Cortex ingester.", "properties": { + "active_queried_series_metrics_enabled": { + "default": false, + "description": "Enable tracking of active queried series using probabilistic data structure and export them as metrics.", + "type": "boolean", + "x-cli-flag": "ingester.active-queried-series-metrics-enabled" + }, + "active_queried_series_metrics_sample_rate": { + "default": 1, + "description": "Sampling rate for active queried series tracking (1.0 = 100% sampling, 0.1 = 10% sampling). By default, all queries are sampled.", + "type": "number", + "x-cli-flag": "ingester.active-queried-series-metrics-sample-rate" + }, + "active_queried_series_metrics_update_period": { + "default": "1m0s", + "description": "How often to update active queried series metrics.", + "type": "string", + "x-cli-flag": "ingester.active-queried-series-metrics-update-period", + "x-format": "duration" + }, + "active_queried_series_metrics_window_duration": { + "default": "15m0s", + "description": "Duration of each sub-window for active queried series tracking (e.g., 1 minute). Used to divide the total tracking period into smaller windows.", + "type": "string", + "x-cli-flag": "ingester.active-queried-series-metrics-window-duration", + "x-format": "duration" + }, + "active_queried_series_metrics_windows": { + "default": "2h0m0s", + "description": "Time windows to expose queried series metric. Each window tracks queried series within that time period.", + "items": { + "type": "string" + }, + "type": "array", + "x-cli-flag": "ingester.active-queried-series-metrics-windows" + }, "active_series_metrics_enabled": { "default": true, "description": "Enable tracking of active series and export them as metrics.", diff --git a/vendor/github.com/axiomhq/hyperloglog/.gitignore b/vendor/github.com/axiomhq/hyperloglog/.gitignore new file mode 100644 index 0000000000..a1338d6851 --- /dev/null +++ b/vendor/github.com/axiomhq/hyperloglog/.gitignore @@ -0,0 +1,14 @@ +# Binaries for programs and plugins +*.exe +*.dll +*.so +*.dylib + +# Test binary, build with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736 +.glide/ diff --git a/vendor/github.com/axiomhq/hyperloglog/Contributing.md b/vendor/github.com/axiomhq/hyperloglog/Contributing.md new file mode 100644 index 0000000000..d0eee97178 --- /dev/null +++ b/vendor/github.com/axiomhq/hyperloglog/Contributing.md @@ -0,0 +1,41 @@ +## How to Contribute + +👍🎉 First of all, thank you for your interest in Axiom-node! We'd love to accept your patches and contributions! 🎉👍 + +This project accepts contributions. In order to contribute, you should pay attention to a few guidelines: + +## Reporting Issues + +Bugs, feature requests, and development-related questions should be directed to our [GitHub issue tracker](https://github.com/axiomhq/hyperloglog/issues). + +When reporting a bug, please try and provide as much context as possible such as your operating system, Go version and anything else that might be relevant to the bug. For feature requests, please explain what you're trying to do and how the requested feature would help you do that. + +## Setup + +[Fork](https://github.com/axiomhq/hyperloglog.git), then clone this repository: + +``` +git clone https://github.com/axiomhq/hyperloglog.git +cd hyperloglog +cd demo +go run hyperloglog_demo.go +``` + +## Submitting Modifications + +1. It's generally best to start by opening a new issue describing the bug or feature you're intending to fix. Even if you think it's relatively minor, it's helpful to know what people are working on. Mention in the initial issue that you are planning to work on that bug or feature so that it can be assigned to you. + +2. Follow the normal process of [forking](https://docs.github.com/en/free-pro-team@latest/github/getting-started-with-github/fork-a-repo) the project, and setup a new branch to work in. It's important that each group of changes be done in separate branches in order to ensure that a pull request only includes the commits related to that bug or feature. + +3. Go makes it very simple to ensure properly formatted code, so always run `go fmt` on your code before committing it. + +4. Do your best to have [well-formated commit messages](https://tbaggery.com/2008/04/19/a-note-about-git-commit-messages.html) +for each change. This provides consistency throughout the project and ensures that commit messages are able to be formatted properly by various git tools. + +5. Finally, push the commits to your fork and submit a [pull request](https://docs.github.com/en/free-pro-team@latest/github/collaborating-with-issues-and-pull-requests/creating-a-pull-request) + +### Once you've filed the PR: + +- One or more maintainers will use GitHub's review feature to review your PR. +- If the maintainer asks for any changes, edit your changes, push, and ask for another review. +- If the maintainer decides to suggest some improvements or alternatives, modify and make improvements. Once your changes are approved, one of the project maintainers will merge them. \ No newline at end of file diff --git a/vendor/github.com/axiomhq/hyperloglog/LICENSE b/vendor/github.com/axiomhq/hyperloglog/LICENSE new file mode 100644 index 0000000000..1ff7ea96ea --- /dev/null +++ b/vendor/github.com/axiomhq/hyperloglog/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2021, Axiom, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/vendor/github.com/axiomhq/hyperloglog/README.md b/vendor/github.com/axiomhq/hyperloglog/README.md new file mode 100644 index 0000000000..f2eee54f99 --- /dev/null +++ b/vendor/github.com/axiomhq/hyperloglog/README.md @@ -0,0 +1,51 @@ +# HyperLogLog - an algorithm for approximating the number of distinct elements + +[![GoDoc](https://godoc.org/github.com/axiomhq/hyperloglog?status.svg)](https://godoc.org/github.com/axiomhq/hyperloglog) [![Go Report Card](https://goreportcard.com/badge/github.com/axiomhq/hyperloglog)](https://goreportcard.com/report/github.com/axiomhq/hyperloglog) [![CircleCI](https://circleci.com/gh/axiomhq/hyperloglog/tree/master.svg?style=svg)](https://circleci.com/gh/axiomhq/hyperloglog/tree/master) + +An improved version of [HyperLogLog](https://en.wikipedia.org/wiki/HyperLogLog) for the count-distinct problem, approximating the number of distinct elements in a multiset. This implementation offers enhanced performance, flexibility, and simplicity while maintaining accuracy. + +## Note on Implementation History + +The initial version of this work (tagged as v0.1.0) was based on ["Better with fewer bits: Improving the performance of cardinality estimation of large data streams - Qingjun Xiao, You Zhou, Shigang Chen"](https://www.cise.ufl.edu/~sgchen/Publications/XZC17.pdf). However, the current implementation has evolved significantly from this original basis, notably moving away from the tailcut method. + +## Current Implementation + +The current implementation is based on the LogLog-Beta algorithm, as described in: + +["LogLog-Beta and More: A New Algorithm for Cardinality Estimation Based on LogLog Counting"](https://arxiv.org/pdf/1612.02284) by Jason Qin, Denys Kim, and Yumei Tung (2016). + +Key features of the current implementation: +* **Metro hash** used instead of xxhash +* **Sparse representation** for lower cardinalities (like HyperLogLog++) +* **LogLog-Beta** for dynamic bias correction across all cardinalities +* **8-bit registers** for convenience and simplified implementation +* **Order-independent insertions and merging** for consistent results regardless of data input order +* **Removal of tailcut method** for a more straightforward approach +* **Flexible precision** allowing for 2^4 to 2^18 registers + +This implementation is now more straightforward, efficient, and flexible, while remaining backwards compatible with previous versions. It provides a balance between precision, memory usage, speed, and ease of use. + +## Precision and Memory Usage + +This implementation allows for creating HyperLogLog sketches with arbitrary precision between 2^4 and 2^18 registers. The memory usage scales with the number of registers: + +* Minimum (2^4 registers): 16 bytes +* Default (2^14 registers): 16 KB +* Maximum (2^18 registers): 256 KB + +Users can choose the precision that best fits their use case, balancing memory usage against estimation accuracy. + +## Note +A big thank you to Prof. Shigang Chen and his team at the University of Florida who are actively conducting research around "Big Network Data". + +## Contributing + +Kindly check our [contributing guide](https://github.com/axiomhq/hyperloglog/blob/main/Contributing.md) on how to propose bugfixes and improvements, and submitting pull requests to the project + +## License + +© Axiom, Inc., 2024 + +Distributed under MIT License (`The MIT License`). + +See [LICENSE](LICENSE) for more information. diff --git a/vendor/github.com/axiomhq/hyperloglog/beta.go b/vendor/github.com/axiomhq/hyperloglog/beta.go new file mode 100644 index 0000000000..29d560136a --- /dev/null +++ b/vendor/github.com/axiomhq/hyperloglog/beta.go @@ -0,0 +1,273 @@ +package hyperloglog + +import ( + "fmt" + "math" +) + +var betaMap = map[uint8]func(float64) float64{ + 4: beta4, + 5: beta5, + 6: beta6, + 7: beta7, + 8: beta8, + 9: beta9, + 10: beta10, + 11: beta11, + 12: beta12, + 13: beta13, + 14: beta14, + 15: beta15, + 16: beta16, + 17: beta17, + 18: beta18, +} + +func beta(p uint8, ez float64) float64 { + f, ok := betaMap[p] + if !ok { + panic(fmt.Sprintf("invalid precision %d", p)) + } + return f(ez) +} + +/* +p=4 +[-0.582581413904517,-1.935300357560050,11.07932375 8035073,-22.131357446444323,22.505391846630037,-12 .000723834917984,3.220579408194167,-0.342225302271 235] +*/ +func beta4(ez float64) float64 { + zl := math.Log(ez + 1) + return -0.582581413904517*ez + + -1.935300357560050*zl + + 11.079323758035073*math.Pow(zl, 2) + + -22.131357446444323*math.Pow(zl, 3) + + 22.505391846630037*math.Pow(zl, 4) + + -12.000723834917984*math.Pow(zl, 5) + + 3.220579408194167*math.Pow(zl, 6) + + -0.342225302271235*math.Pow(zl, 7) +} + +/* +p=5 +[-0.7518999460733967,-0.9590030077748760,5.5997371 322141607,-8.2097636999765520,6.5091254894472037,- 2.6830293734323729,0.5612891113138221,-0.046333162 2196545] +*/ +func beta5(ez float64) float64 { + zl := math.Log(ez + 1) + return -0.7518999460733967*ez + + -0.9590030077748760*zl + + 5.5997371322141607*math.Pow(zl, 2) + + -8.2097636999765520*math.Pow(zl, 3) + + 6.5091254894472037*math.Pow(zl, 4) + + -2.6830293734323729*math.Pow(zl, 5) + + 0.5612891113138221*math.Pow(zl, 6) + + -0.0463331622196545*math.Pow(zl, 7) +} + +/* +p=6 +[29.8257900969619634,-31.3287083337725925,-10.5942 523036582283,-11.5720125689099618,3.81887543739074 92,-2.4160130328530811,0.4542208940970826,-0.05751 55452020420] +*/ +func beta6(ez float64) float64 { + zl := math.Log(ez + 1) + return 29.8257900969619634*ez + + -31.3287083337725925*zl + + -10.5942523036582283*math.Pow(zl, 2) + + -11.5720125689099618*math.Pow(zl, 3) + + 3.8188754373907492*math.Pow(zl, 4) + + -2.4160130328530811*math.Pow(zl, 5) + + 0.4542208940970826*math.Pow(zl, 6) + + -0.0575155452020420*math.Pow(zl, 7) +} + +/* +p=7 +[2.8102921290820060,-3.9780498518175995,1.31626800 41351582,-3.9252486335805901,2.0080835753946471,-0 .7527151937556955,0.1265569894242751,-0.0109946438726240] +*/ +func beta7(ez float64) float64 { + zl := math.Log(ez + 1) + return 2.8102921290820060*ez + + -3.9780498518175995*zl + + 1.3162680041351582*math.Pow(zl, 2) + + -3.9252486335805901*math.Pow(zl, 3) + + 2.0080835753946471*math.Pow(zl, 4) + + -0.7527151937556955*math.Pow(zl, 5) + + 0.1265569894242751*math.Pow(zl, 6) + + -0.0109946438726240*math.Pow(zl, 7) +} + +/* +p=8 +[1.00633544887550519,-2.00580666405112407,1.643697 49366514117,-2.70560809940566172,1.392099802442225 98,-0.46470374272183190,0.07384282377269775,-0.00578554885254223] +*/ +func beta8(ez float64) float64 { + zl := math.Log(ez + 1) + return 1.00633544887550519*ez + + -2.00580666405112407*zl + + 1.64369749366514117*math.Pow(zl, 2) + + -2.70560809940566172*math.Pow(zl, 3) + + 1.39209980244222598*math.Pow(zl, 4) + + -0.46470374272183190*math.Pow(zl, 5) + + 0.07384282377269775*math.Pow(zl, 6) + + -0.00578554885254223*math.Pow(zl, 7) +} + +/* +p=9 +[-0.09415657458167959,-0.78130975924550528,1.71514 946750712460,-1.73711250406516338,0.86441508489048 924,-0.23819027465047218,0.03343448400269076,-0.00 207858528178157] +*/ +func beta9(ez float64) float64 { + zl := math.Log(ez + 1) + return -0.09415657458167959*ez + + -0.78130975924550528*zl + + 1.71514946750712460*math.Pow(zl, 2) + + -1.73711250406516338*math.Pow(zl, 3) + + 0.86441508489048924*math.Pow(zl, 4) + + -0.23819027465047218*math.Pow(zl, 5) + + 0.03343448400269076*math.Pow(zl, 6) + + -0.00207858528178157*math.Pow(zl, 7) +} + +/* +p=10 +[-0.25935400670790054,-0.52598301999805808,1.48933 034925876839,-1.29642714084993571,0.62284756217221615,-0.15672326770251041,0.02054415903878563,-0.00 112488483925502] +*/ +func beta10(ez float64) float64 { + zl := math.Log(ez + 1) + return -0.25935400670790054*ez + + -0.52598301999805808*zl + + 1.48933034925876839*math.Pow(zl, 2) + + -1.29642714084993571*math.Pow(zl, 3) + + 0.62284756217221615*math.Pow(zl, 4) + + -0.15672326770251041*math.Pow(zl, 5) + + 0.02054415903878563*math.Pow(zl, 6) + + -0.00112488483925502*math.Pow(zl, 7) +} + +/* +p=11 +[-4.32325553856025e-01,-1.08450736399632e-01,6.091 56550741120e-01,-1.65687801845180e-02,-7.958293410 87617e-02,4.71830602102918e-02,-7.81372902346934e- 03,5.84268708489995e-04] +*/ +func beta11(ez float64) float64 { + zl := math.Log(ez + 1) + return -0.432325553856025*ez + + -0.108450736399632*zl + + 0.609156550741120*math.Pow(zl, 2) + + -0.0165687801845180*math.Pow(zl, 3) + + -0.0795829341087617*math.Pow(zl, 4) + + 0.0471830602102918*math.Pow(zl, 5) + + -0.00781372902346934*math.Pow(zl, 6) + + 0.000584268708489995*math.Pow(zl, 7) +} + +/* +p=12 +[-3.84979202588598e-01,1.83162233114364e-01,1.3039 6688841854e-01,7.04838927629266e-02,-8.95893971464 453e-03,1.13010036741605e-02,-1.94285569591290e-03 ,2.25435774024964e-04] +*/ +func beta12(ez float64) float64 { + zl := math.Log(ez + 1) + return -0.384979202588598*ez + + 0.183162233114364*zl + + 0.130396688841854*math.Pow(zl, 2) + + 0.0704838927629266*math.Pow(zl, 3) + + -0.0089589397146453*math.Pow(zl, 4) + + 0.0113010036741605*math.Pow(zl, 5) + + -0.00194285569591290*math.Pow(zl, 6) + + 0.000225435774024964*math.Pow(zl, 7) +} + +/* +p=13 +[-0.41655270946462997,-0.22146677040685156,0.38862 131236999947,0.45340979746062371,-0.36264738324476 375,0.12304650053558529,-0.01701540384555510,0.001 02750367080838] +*/ +func beta13(ez float64) float64 { + zl := math.Log(ez + 1) + return -0.41655270946462997*ez + + -0.22146677040685156*zl + + 0.38862131236999947*math.Pow(zl, 2) + + 0.45340979746062371*math.Pow(zl, 3) + + -0.36264738324476375*math.Pow(zl, 4) + + 0.12304650053558529*math.Pow(zl, 5) + + -0.01701540384555510*math.Pow(zl, 6) + + 0.00102750367080838*math.Pow(zl, 7) +} + +/* +p=14 +[-3.71009760230692e-01,9.78811941207509e-03,1.8579 6293324165e-01,2.03015527328432e-01,-1.16710521803 686e-01,4.31106699492820e-02,-5.99583540511831e-03 ,4.49704299509437e-04] +*/ + +func beta14(ez float64) float64 { + zl := math.Log(ez + 1) + return -0.371009760230692*ez + + 0.00978811941207509*zl + + 0.185796293324165*math.Pow(zl, 2) + + 0.203015527328432*math.Pow(zl, 3) + + -0.116710521803686*math.Pow(zl, 4) + + 0.0431106699492820*math.Pow(zl, 5) + + -0.00599583540511831*math.Pow(zl, 6) + + 0.000449704299509437*math.Pow(zl, 7) +} + +/* +p=15 +[-0.38215145543875273,-0.89069400536090837,0.37602 335774678869,0.99335977440682377,-0.65577441638318 956,0.18332342129703610,-0.02241529633062872,0.001 21399789330194] +*/ +func beta15(ez float64) float64 { + zl := math.Log(ez + 1) + return -0.38215145543875273*ez + + -0.89069400536090837*zl + + 0.37602335774678869*math.Pow(zl, 2) + + 0.99335977440682377*math.Pow(zl, 3) + + -0.65577441638318956*math.Pow(zl, 4) + + 0.18332342129703610*math.Pow(zl, 5) + + -0.02241529633062872*math.Pow(zl, 6) + + 0.00121399789330194*math.Pow(zl, 7) +} + +/* +p=16 +[-0.37331876643753059,-1.41704077448122989,0.407291 84796612533,1.56152033906584164,-0.99242233534286128,0.26064681399483092,-0.03053811369682807,0.00155770210179105] +*/ +func beta16(ez float64) float64 { + zl := math.Log(ez + 1) + return -0.37331876643753059*ez + + -1.41704077448122989*zl + + 0.40729184796612533*math.Pow(zl, 2) + + 1.56152033906584164*math.Pow(zl, 3) + + -0.99242233534286128*math.Pow(zl, 4) + + 0.26064681399483092*math.Pow(zl, 5) + + -0.03053811369682807*math.Pow(zl, 6) + + 0.00155770210179105*math.Pow(zl, 7) +} + +/* +p=17 +[-0.36775502299404605,0.53831422351377967,0.769702 89278767923,0.55002583586450560,-0.745755882611469 41,0.25711835785821952,-0.03437902606864149,0.0018 5949146371616] +*/ +func beta17(ez float64) float64 { + zl := math.Log(ez + 1) + return -0.36775502299404605*ez + + 0.53831422351377967*zl + + 0.76970289278767923*math.Pow(zl, 2) + + 0.55002583586450560*math.Pow(zl, 3) + + -0.74575588261146941*math.Pow(zl, 4) + + 0.25711835785821952*math.Pow(zl, 5) + + -0.03437902606864149*math.Pow(zl, 6) + + 0.00185949146371616*math.Pow(zl, 7) +} + +/* +p=18 +[-0.36479623325960542,0.99730412328635032,1.553543 86230081221,1.25932677198028919,-1.533259482091101 63,0.47801042200056593,-0.05951025172951174,0.0029 1076804642205] +*/ +func beta18(ez float64) float64 { + zl := math.Log(ez + 1) + return -0.36479623325960542*ez + + 0.99730412328635032*zl + + 1.55354386230081221*math.Pow(zl, 2) + + 1.25932677198028919*math.Pow(zl, 3) + + -1.53325948209110163*math.Pow(zl, 4) + + 0.47801042200056593*math.Pow(zl, 5) + + -0.05951025172951174*math.Pow(zl, 6) + + 0.00291076804642205*math.Pow(zl, 7) +} diff --git a/vendor/github.com/axiomhq/hyperloglog/compressed.go b/vendor/github.com/axiomhq/hyperloglog/compressed.go new file mode 100644 index 0000000000..24425b4da6 --- /dev/null +++ b/vendor/github.com/axiomhq/hyperloglog/compressed.go @@ -0,0 +1,166 @@ +package hyperloglog + +import ( + "encoding/binary" + "slices" +) + +// Original author of this file is github.com/clarkduvall/hyperloglog + +type iterator struct { + i int + last uint32 + v *compressedList +} + +func (iter *iterator) Next() uint32 { + n, i := iter.v.decode(iter.i, iter.last) + iter.last = n + iter.i = i + return n +} + +func (iter *iterator) Peek() (uint32, int) { + return iter.v.decode(iter.i, iter.last) +} + +func (iter *iterator) Advance(last uint32, i int) { + iter.last = last + iter.i = i +} + +func (iter iterator) HasNext() bool { + return iter.i < iter.v.Len() +} + +type compressedList struct { + count uint32 + last uint32 + b variableLengthList +} + +func (v *compressedList) Clone() *compressedList { + if v == nil { + return nil + } + + newV := &compressedList{ + count: v.count, + last: v.last, + } + + newV.b = make(variableLengthList, len(v.b)) + copy(newV.b, v.b) + return newV +} + +func (v *compressedList) AppendBinary(data []byte) ([]byte, error) { + // At least 4 bytes for the two fixed sized values + data = slices.Grow(data, 4+4) + + // Marshal the count and last values. + data = append(data, + // Number of items in the list. + byte(v.count>>24), + byte(v.count>>16), + byte(v.count>>8), + byte(v.count), + // The last item in the list. + byte(v.last>>24), + byte(v.last>>16), + byte(v.last>>8), + byte(v.last), + ) + + // Append the variableLengthList + return v.b.AppendBinary(data) +} + +func (v *compressedList) UnmarshalBinary(data []byte) error { + if len(data) < 12 { + return ErrorTooShort + } + + // Set the count. + v.count, data = binary.BigEndian.Uint32(data[:4]), data[4:] + + // Set the last value. + v.last, data = binary.BigEndian.Uint32(data[:4]), data[4:] + + // Set the list. + sz, data := binary.BigEndian.Uint32(data[:4]), data[4:] + v.b = make([]uint8, sz) + if uint32(len(data)) < sz { + return ErrorTooShort + } + for i := uint32(0); i < sz; i++ { + v.b[i] = data[i] + } + return nil +} + +func newCompressedList(capacity int) *compressedList { + v := &compressedList{} + v.b = make(variableLengthList, 0, capacity) + return v +} + +func (v *compressedList) Len() int { + return len(v.b) +} + +func (v *compressedList) decode(i int, last uint32) (uint32, int) { + n, i := v.b.decode(i) + return n + last, i +} + +func (v *compressedList) Append(x uint32) { + v.count++ + v.b = v.b.Append(x - v.last) + v.last = x +} + +func (v *compressedList) Iter() iterator { + return iterator{0, 0, v} +} + +type variableLengthList []uint8 + +func (v variableLengthList) AppendBinary(data []byte) ([]byte, error) { + // 4 bytes for the size of the list, and a byte for each element in the + // list. + data = slices.Grow(data, 4+len(v)) + + // Length of the list. We only need 32 bits because the size of the set + // couldn't exceed that on 32 bit architectures. + sz := len(v) + data = append(data, + byte(sz>>24), + byte(sz>>16), + byte(sz>>8), + byte(sz), + ) + + // Marshal each element in the list. + data = append(data, v...) + + return data, nil +} + +func (v variableLengthList) decode(i int) (uint32, int) { + var x uint32 + j := i + for ; v[j]&0x80 != 0; j++ { + x |= uint32(v[j]&0x7f) << (uint(j-i) * 7) + } + x |= uint32(v[j]) << (uint(j-i) * 7) + return x, j + 1 +} + +func (v variableLengthList) Append(x uint32) variableLengthList { + for x&0xffffff80 != 0 { + v = append(v, uint8((x&0x7f)|0x80)) + x >>= 7 + } + return append(v, uint8(x&0x7f)) +} diff --git a/vendor/github.com/axiomhq/hyperloglog/hyperloglog.go b/vendor/github.com/axiomhq/hyperloglog/hyperloglog.go new file mode 100644 index 0000000000..0ebfd124f1 --- /dev/null +++ b/vendor/github.com/axiomhq/hyperloglog/hyperloglog.go @@ -0,0 +1,354 @@ +package hyperloglog + +import ( + "encoding/binary" + "errors" + "fmt" + "math" + "slices" +) + +const ( + pp = uint8(25) + mp = uint32(1) << pp + version = 2 +) + +type Sketch struct { + p uint8 + m uint32 + alpha float64 + tmpSet set + sparseList *compressedList + regs []uint8 +} + +// New returns a HyperLogLog Sketch with 2^14 registers (precision 14) +func New() *Sketch { return New14() } + +// New14 returns a HyperLogLog Sketch with 2^14 registers (precision 14) +func New14() *Sketch { return newSketchNoError(14, true) } + +// New16 returns a HyperLogLog Sketch with 2^16 registers (precision 16) +func New16() *Sketch { return newSketchNoError(16, true) } + +// NewNoSparse returns a HyperLogLog Sketch with 2^14 registers (precision 14) that will not use a sparse representation +func NewNoSparse() *Sketch { return newSketchNoError(14, false) } + +// New16NoSparse returns a HyperLogLog Sketch with 2^16 registers (precision 16) that will not use a sparse representation +func New16NoSparse() *Sketch { return newSketchNoError(16, false) } + +func newSketchNoError(precision uint8, sparse bool) *Sketch { + sk, _ := NewSketch(precision, sparse) + return sk +} + +func NewSketch(precision uint8, sparse bool) (*Sketch, error) { + if precision < 4 || precision > 18 { + return nil, fmt.Errorf("p has to be >= 4 and <= 18") + } + m := uint32(1) << precision + s := &Sketch{ + m: m, + p: precision, + alpha: alpha(float64(m)), + } + if sparse { + s.tmpSet = makeSet(0) + s.sparseList = newCompressedList(0) + } else { + s.regs = make([]uint8, m) + } + return s, nil +} + +func (sk *Sketch) sparse() bool { return sk.sparseList != nil } + +// Clone returns a deep copy of sk. +func (sk *Sketch) Clone() *Sketch { + clone := *sk + clone.regs = append([]uint8(nil), sk.regs...) + clone.tmpSet = sk.tmpSet.Clone() + clone.sparseList = sk.sparseList.Clone() + return &clone +} + +func (sk *Sketch) maybeToNormal() { + if uint32(sk.tmpSet.Len())*100 > sk.m { + sk.mergeSparse() + if uint32(sk.sparseList.Len()) > sk.m { + sk.toNormal() + } + } +} + +func (sk *Sketch) Merge(other *Sketch) error { + if other == nil { + return nil + } + if sk.p != other.p { + return errors.New("precisions must be equal") + } + + if sk.sparse() && other.sparse() { + sk.mergeSparseSketch(other) + } else { + sk.mergeDenseSketch(other) + } + return nil +} + +func (sk *Sketch) mergeSparseSketch(other *Sketch) { + sk.tmpSet.Merge(other.tmpSet) + for iter := other.sparseList.Iter(); iter.HasNext(); { + sk.tmpSet.add(iter.Next()) + } + sk.maybeToNormal() +} + +func (sk *Sketch) mergeDenseSketch(other *Sketch) { + if sk.sparse() { + sk.toNormal() + } + + if other.sparse() { + other.tmpSet.ForEach(func(k uint32) { + i, r := decodeHash(k, other.p, pp) + sk.insert(i, r) + }) + for iter := other.sparseList.Iter(); iter.HasNext(); { + i, r := decodeHash(iter.Next(), other.p, pp) + sk.insert(i, r) + } + } else { + for i, v := range other.regs { + if v > sk.regs[i] { + sk.regs[i] = v + } + } + } +} + +func (sk *Sketch) toNormal() { + if sk.tmpSet.Len() > 0 { + sk.mergeSparse() + } + + sk.regs = make([]uint8, sk.m) + for iter := sk.sparseList.Iter(); iter.HasNext(); { + i, r := decodeHash(iter.Next(), sk.p, pp) + sk.insert(i, r) + } + + sk.tmpSet = nilSet + sk.sparseList = nil +} + +func (sk *Sketch) insert(i uint32, r uint8) { sk.regs[i] = max(r, sk.regs[i]) } +func (sk *Sketch) Insert(e []byte) { sk.InsertHash(hash(e)) } + +func (sk *Sketch) InsertHash(x uint64) { + if sk.sparse() { + if sk.tmpSet.add(encodeHash(x, sk.p, pp)) { + sk.maybeToNormal() + } + return + } + i, r := getPosVal(x, sk.p) + sk.insert(uint32(i), r) +} + +func (sk *Sketch) Estimate() uint64 { + if sk.sparse() { + sk.mergeSparse() + return uint64(linearCount(mp, mp-sk.sparseList.count)) + } + + sum, ez := sumAndZeros(sk.regs) + m := float64(sk.m) + + est := sk.alpha * m * (m - ez) / (sum + beta(sk.p, ez)) + return uint64(est + 0.5) +} + +func (sk *Sketch) mergeSparse() { + if sk.tmpSet.Len() == 0 { + return + } + + keys := make([]uint32, 0, sk.tmpSet.Len()) + sk.tmpSet.ForEach(func(k uint32) { + keys = append(keys, k) + }) + slices.Sort(keys) + + newList := newCompressedList(4*sk.tmpSet.Len() + sk.sparseList.Len()) + for iter, i := sk.sparseList.Iter(), 0; iter.HasNext() || i < len(keys); { + if !iter.HasNext() { + newList.Append(keys[i]) + i++ + continue + } + + if i >= len(keys) { + newList.Append(iter.Next()) + continue + } + + x1, adv := iter.Peek() + x2 := keys[i] + if x1 == x2 { + newList.Append(x1) + iter.Advance(x1, adv) + i++ + } else if x1 > x2 { + newList.Append(x2) + i++ + } else { + newList.Append(x1) + iter.Advance(x1, adv) + } + } + + sk.sparseList = newList + sk.tmpSet = makeSet(0) +} + +// MarshalBinary implements the encoding.BinaryMarshaler interface. +// +// When the result will be appended to another buffer, consider using +// AppendBinary to avoid additional allocations and copying. +func (sk *Sketch) MarshalBinary() (data []byte, err error) { + return sk.AppendBinary(nil) +} + +// AppendBinary implements the encoding.BinaryAppender interface. +func (sk *Sketch) AppendBinary(data []byte) ([]byte, error) { + data = slices.Grow(data, 8+len(sk.regs)) + // Marshal a version marker. + data = append(data, version) + // Marshal p. + data = append(data, sk.p) + // Marshal b + data = append(data, 0) + + if sk.sparse() { + // It's using the sparse Sketch. + data = append(data, byte(1)) + + // Add the tmp_set + data, err := sk.tmpSet.AppendBinary(data) + if err != nil { + return nil, err + } + + // Add the sparse Sketch + return sk.sparseList.AppendBinary(data) + } + + // It's using the dense Sketch. + data = append(data, byte(0)) + + // Add the dense sketch Sketch. + sz := len(sk.regs) + data = append(data, + byte(sz>>24), + byte(sz>>16), + byte(sz>>8), + byte(sz), + ) + + // Marshal each element in the list. + for _, v := range sk.regs { + data = append(data, byte(v)) + } + + return data, nil +} + +// ErrorTooShort is an error that UnmarshalBinary try to parse too short +// binary. +var ErrorTooShort = errors.New("too short binary") + +// UnmarshalBinary implements the encoding.BinaryUnmarshaler interface. +func (sk *Sketch) UnmarshalBinary(data []byte) error { + if len(data) < 8 { + return ErrorTooShort + } + + // Unmarshal version. We may need this in the future if we make + // non-compatible changes. + v := data[0] + + // Unmarshal p. + p := data[1] + + // Unmarshal b. + b := data[2] + + // Determine if we need a sparse Sketch + sparse := data[3] == byte(1) + + // Make a newSketch Sketch if the precision doesn't match or if the Sketch was used + if sk.p != p || sk.regs != nil || sk.tmpSet.Len() > 0 || (sk.sparseList != nil && sk.sparseList.Len() > 0) { + newh, err := NewSketch(p, sparse) + if err != nil { + return err + } + *sk = *newh + } + + // h is now initialised with the correct p. We just need to fill the + // rest of the details out. + if sparse { + // Using the sparse Sketch. + + // Unmarshal the tmp_set. + tssz := binary.BigEndian.Uint32(data[4:8]) + sk.tmpSet = makeSet(int(tssz)) + + // We need to unmarshal tssz values in total, and each value requires us + // to read 4 bytes. + tsLastByte := int((tssz * 4) + 8) + for i := 8; i < tsLastByte; i += 4 { + k := binary.BigEndian.Uint32(data[i : i+4]) + sk.tmpSet.add(k) + } + + // Unmarshal the sparse Sketch. + return sk.sparseList.UnmarshalBinary(data[tsLastByte:]) + } + + // Using the dense Sketch. + sk.sparseList = nil + sk.tmpSet = nilSet + + if v == 1 { + return sk.unmarshalBinaryV1(data[8:], b) + } + return sk.unmarshalBinaryV2(data) +} + +func sumAndZeros(regs []uint8) (res, ez float64) { + for _, v := range regs { + if v == 0 { + ez++ + } + res += 1.0 / math.Pow(2.0, float64(v)) + } + return res, ez +} + +func (sk *Sketch) unmarshalBinaryV1(data []byte, b uint8) error { + sk.regs = make([]uint8, len(data)*2) + for i, v := range data { + sk.regs[i*2] = uint8((v >> 4)) + b + sk.regs[i*2+1] = uint8((v<<4)>>4) + b + } + return nil +} + +func (sk *Sketch) unmarshalBinaryV2(data []byte) error { + sk.regs = data[8:] + return nil +} diff --git a/vendor/github.com/axiomhq/hyperloglog/sparse.go b/vendor/github.com/axiomhq/hyperloglog/sparse.go new file mode 100644 index 0000000000..487d4083cf --- /dev/null +++ b/vendor/github.com/axiomhq/hyperloglog/sparse.go @@ -0,0 +1,112 @@ +package hyperloglog + +import ( + "math/bits" + "slices" + + "github.com/kamstrup/intmap" +) + +func getIndex(k uint32, p, pp uint8) uint32 { + if k&1 == 1 { + return bextr32(k, 32-p, p) + } + return bextr32(k, pp-p+1, p) +} + +// Encode a hash to be used in the sparse representation. +func encodeHash(x uint64, p, pp uint8) uint32 { + idx := uint32(bextr(x, 64-pp, pp)) + if bextr(x, 64-pp, pp-p) == 0 { + zeros := bits.LeadingZeros64((bextr(x, 0, 64-pp)<>24), + byte(sl>>16), + byte(sl>>8), + byte(sl), + ) + + // Marshal each element in the set. + s.m.ForEach(func(k uint32) bool { + data = append(data, + byte(k>>24), + byte(k>>16), + byte(k>>8), + byte(k), + ) + return true + }) + + return data, nil +} diff --git a/vendor/github.com/axiomhq/hyperloglog/utils.go b/vendor/github.com/axiomhq/hyperloglog/utils.go new file mode 100644 index 0000000000..c032c88d4b --- /dev/null +++ b/vendor/github.com/axiomhq/hyperloglog/utils.go @@ -0,0 +1,46 @@ +package hyperloglog + +import ( + "math" + "math/bits" + + metro "github.com/dgryski/go-metro" +) + +var hash = hashFunc + +func alpha(m float64) float64 { + switch m { + case 16: + return 0.673 + case 32: + return 0.697 + case 64: + return 0.709 + } + return 0.7213 / (1 + 1.079/m) +} + +func getPosVal(x uint64, p uint8) (uint64, uint8) { + i := bextr(x, 64-p, p) // {x63,...,x64-p} + w := x<

> start) & ((1 << length) - 1) +} + +func bextr32(v uint32, start, length uint8) uint32 { + return (v >> start) & ((1 << length) - 1) +} + +func hashFunc(e []byte) uint64 { + return metro.Hash64(e, 1337) +} diff --git a/vendor/github.com/kamstrup/intmap/.gitignore b/vendor/github.com/kamstrup/intmap/.gitignore new file mode 100644 index 0000000000..1377554ebe --- /dev/null +++ b/vendor/github.com/kamstrup/intmap/.gitignore @@ -0,0 +1 @@ +*.swp diff --git a/vendor/github.com/kamstrup/intmap/LICENSE b/vendor/github.com/kamstrup/intmap/LICENSE new file mode 100644 index 0000000000..1eac633b0c --- /dev/null +++ b/vendor/github.com/kamstrup/intmap/LICENSE @@ -0,0 +1,23 @@ +Copyright (c) 2016, Brent Pedersen - Bioinformatics +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/kamstrup/intmap/README.md b/vendor/github.com/kamstrup/intmap/README.md new file mode 100644 index 0000000000..e1a1e7003a --- /dev/null +++ b/vendor/github.com/kamstrup/intmap/README.md @@ -0,0 +1,52 @@ +Fast hashmap with integer keys for Golang + +[![GoDoc](https://godoc.org/github.com/kamstrup/intmap?status.svg)](https://godoc.org/github.com/kamstrup/intmap) +[![Go Report Card](https://goreportcard.com/badge/github.com/kamstrup/intmap)](https://goreportcard.com/report/github.com/kamstrup/intmap) + +# intmap + + import "github.com/kamstrup/intmap" + +Package intmap is a fast hashmap implementation for Golang, specialized for maps with integer type keys. +The values can be of any type. + +It is a full port of https://github.com/brentp/intintmap to use type parameters (aka generics). + +It interleaves keys and values in the same underlying array to improve locality. +This is also known as open addressing with linear probing. + +It is up to 3X faster than the builtin map: +``` +name time/op +Map64Fill-8 201ms ± 5% +IntIntMapFill-8 207ms ±31% +StdMapFill-8 371ms ±11% +Map64Get10PercentHitRate-8 148µs ±40% +IntIntMapGet10PercentHitRate-8 171µs ±50% +StdMapGet10PercentHitRate-8 171µs ±33% +Map64Get100PercentHitRate-8 4.50ms ± 5% +IntIntMapGet100PercentHitRate-8 4.82ms ± 6% +StdMapGet100PercentHitRate-8 15.5ms ±32% +``` + +## Usage + +```go +m := intmap.New[int64,int64](32768) +m.Put(int64(1234), int64(-222)) +m.Put(int64(123), int64(33)) + +v, ok := m.Get(int64(222)) +v, ok := m.Get(int64(333)) + +m.Del(int64(222)) +m.Del(int64(333)) + +fmt.Println(m.Len()) + +m.ForEach(func(k int64, v int64) { + fmt.Printf("key: %d, value: %d\n", k, v) +}) + +m.Clear() // all gone, but buffers kept +``` diff --git a/vendor/github.com/kamstrup/intmap/map64.go b/vendor/github.com/kamstrup/intmap/map64.go new file mode 100644 index 0000000000..ecc539502e --- /dev/null +++ b/vendor/github.com/kamstrup/intmap/map64.go @@ -0,0 +1,458 @@ +// Package intmap contains a fast hashmap implementation for maps with keys of any integer type +package intmap + +import ( + "iter" + "math" +) + +// IntKey is a type constraint for values that can be used as keys in Map +type IntKey interface { + ~int | ~uint | ~int64 | ~uint64 | ~int32 | ~uint32 | ~int16 | ~uint16 | ~int8 | ~uint8 | ~uintptr +} + +// pair represents a key-value pair in Map. +// +// It is an important detail that V is before K in the memory layout. Despite it feeling more natural to have K first! +// We must have sizeof(pair[K,struct{}]) == sizeof(K), to minimize memory consumption when using a Set. +// If V is last, then &p.V can point to invalid memory, which is not permitted. This makes the Go compiler emit +// some padding for the pair struct in that case. +// See https://github.com/kamstrup/intmap/pull/6#issuecomment-3581008879 +type pair[K IntKey, V any] struct { + V V + K K +} + +const fillFactorBase64 = 7 +const fillFactor64 = fillFactorBase64 / 10.0 + +func phiMix64(x int) int { + h := int64(x) * int64(0x9E3779B9) + return int(h ^ (h >> 16)) +} + +// Map is a hashmap where the keys are some any integer type. +// It is valid to call methods that read a nil map, similar to a standard Go map. +// Methods valid on a nil map are Has, Get, Len, and ForEach. +type Map[K IntKey, V any] struct { + data []pair[K, V] // key-value pairs + size int + + zeroVal V // value of 'zero' key + hasZeroKey bool // do we have 'zero' key in the map? +} + +// New creates a new map with keys being any integer subtype. +// The map can store up to the given capacity before reallocation and rehashing occurs. +func New[K IntKey, V any](capacity int) *Map[K, V] { + return &Map[K, V]{ + data: make([]pair[K, V], arraySize(capacity, fillFactor64)), + } +} + +// Has checks if the given key exists in the map. +// Calling this method on a nil map will return false. +func (m *Map[K, V]) Has(key K) bool { + if m == nil { + return false + } + + if key == K(0) { + return m.hasZeroKey + } + + idx := m.startIndex(key) + p := m.data[idx] + + if p.K == K(0) { // end of chain already + return false + } + if p.K == key { // we check zero prior to this call + return true + } + + // hash collision, seek next hash match, bailing on first empty + for { + idx = m.nextIndex(idx) + p = m.data[idx] + if p.K == K(0) { + return false + } + if p.K == key { + return true + } + } +} + +// Get returns the value if the key is found. +// If you just need to check for existence it is easier to use Has. +// Calling this method on a nil map will return the zero value for V and false. +func (m *Map[K, V]) Get(key K) (V, bool) { + if m == nil { + var zero V + return zero, false + } + + if key == K(0) { + if m.hasZeroKey { + return m.zeroVal, true + } + var zero V + return zero, false + } + + idx := m.startIndex(key) + p := m.data[idx] + + if p.K == K(0) { // end of chain already + var zero V + return zero, false + } + if p.K == key { // we check zero prior to this call + return p.V, true + } + + // hash collision, seek next hash match, bailing on first empty + for { + idx = m.nextIndex(idx) + p = m.data[idx] + if p.K == K(0) { + var zero V + return zero, false + } + if p.K == key { + return p.V, true + } + } +} + +// Put adds or updates key with value val. +func (m *Map[K, V]) Put(key K, val V) { + if key == K(0) { + if !m.hasZeroKey { + m.size++ + } + m.zeroVal = val + m.hasZeroKey = true + return + } + + idx := m.startIndex(key) + p := &m.data[idx] + + if p.K == K(0) { // end of chain already + p.K = key + p.V = val + if m.size >= m.sizeThreshold() { + m.rehash() + } else { + m.size++ + } + return + } else if p.K == key { // overwrite existing value + p.V = val + return + } + + // hash collision, seek next empty or key match + for { + idx = m.nextIndex(idx) + p = &m.data[idx] + + if p.K == K(0) { + p.K = key + p.V = val + if m.size >= m.sizeThreshold() { + m.rehash() + } else { + m.size++ + } + return + } else if p.K == key { + p.V = val + return + } + } +} + +// PutIfNotExists adds the key-value pair only if the key does not already exist +// in the map, and returns the current value associated with the key and a boolean +// indicating whether the value was newly added or not. +func (m *Map[K, V]) PutIfNotExists(key K, val V) (V, bool) { + if key == K(0) { + if m.hasZeroKey { + return m.zeroVal, false + } + m.zeroVal = val + m.hasZeroKey = true + m.size++ + return val, true + } + + idx := m.startIndex(key) + p := &m.data[idx] + + if p.K == K(0) { // end of chain already + p.K = key + p.V = val + m.size++ + if m.size >= m.sizeThreshold() { + m.rehash() + } + return val, true + } else if p.K == key { + return p.V, false + } + + // hash collision, seek next hash match, bailing on first empty + for { + idx = m.nextIndex(idx) + p = &m.data[idx] + + if p.K == K(0) { + p.K = key + p.V = val + m.size++ + if m.size >= m.sizeThreshold() { + m.rehash() + } + return val, true + } else if p.K == key { + return p.V, false + } + } +} + +// ForEach iterates through key-value pairs in the map while the function f returns true. +// This method returns immediately if invoked on a nil map. +// +// The iteration order of a Map is not defined, so please avoid relying on it. +func (m *Map[K, V]) ForEach(f func(K, V) bool) { + if m == nil { + return + } + + if m.hasZeroKey && !f(K(0), m.zeroVal) { + return + } + forEach64(m.data, f) +} + +// All returns an iterator over key-value pairs from m. +// The iterator returns immediately if invoked on a nil map. +// +// The iteration order of a Map is not defined, so please avoid relying on it. +func (m *Map[K, V]) All() iter.Seq2[K, V] { + return m.ForEach +} + +// Keys returns an iterator over keys in m. +// The iterator returns immediately if invoked on a nil map. +// +// The iteration order of a Map is not defined, so please avoid relying on it. +func (m *Map[K, V]) Keys() iter.Seq[K] { + return func(yield func(k K) bool) { + if m == nil { + return + } + + if m.hasZeroKey && !yield(K(0)) { + return + } + + for _, p := range m.data { + if p.K != K(0) && !yield(p.K) { + return + } + } + } +} + +// Values returns an iterator over values in m. +// The iterator returns immediately if invoked on a nil map. +// +// The iteration order of a Map is not defined, so please avoid relying on it. +func (m *Map[K, V]) Values() iter.Seq[V] { + return func(yield func(v V) bool) { + if m == nil { + return + } + + if m.hasZeroKey && !yield(m.zeroVal) { + return + } + + for _, p := range m.data { + if p.K != K(0) && !yield(p.V) { + return + } + } + } +} + +// Clear removes all items from the map, but keeps the internal buffers for reuse. +func (m *Map[K, V]) Clear() { + var zero V + m.hasZeroKey = false + m.zeroVal = zero + + // compiles down to runtime.memclr() + for i := range m.data { + m.data[i] = pair[K, V]{} + } + + m.size = 0 +} + +func (m *Map[K, V]) rehash() { + oldData := m.data + m.data = make([]pair[K, V], 2*len(m.data)) + + // reset size + if m.hasZeroKey { + m.size = 1 + } else { + m.size = 0 + } + + forEach64(oldData, func(k K, v V) bool { + m.Put(k, v) + return true + }) +} + +// Len returns the number of elements in the map. +// The length of a nil map is defined to be zero. +func (m *Map[K, V]) Len() int { + if m == nil { + return 0 + } + + return m.size +} + +func (m *Map[K, V]) sizeThreshold() int { + return int(uint64(len(m.data)) * fillFactorBase64 / 10) +} + +func (m *Map[K, V]) startIndex(key K) int { + return startIndex(int(key), len(m.data)) +} + +func (m *Map[K, V]) nextIndex(idx int) int { + return nextIndex(idx, len(m.data)) +} + +func forEach64[K IntKey, V any](pairs []pair[K, V], f func(k K, v V) bool) { + for _, p := range pairs { + if p.K != K(0) && !f(p.K, p.V) { + return + } + } +} + +// Del deletes a key and its value, returning true iff the key was found +func (m *Map[K, V]) Del(key K) bool { + if key == K(0) { + if m.hasZeroKey { + m.hasZeroKey = false + m.size-- + return true + } + return false + } + + idx := m.startIndex(key) + p := m.data[idx] + + if p.K == key { + // any keys that were pushed back needs to be shifted nack into the empty slot + // to avoid breaking the chain + m.shiftKeys(idx) + m.size-- + return true + } else if p.K == K(0) { // end of chain already + return false + } + + for { + idx = m.nextIndex(idx) + p = m.data[idx] + + if p.K == key { + // any keys that were pushed back needs to be shifted nack into the empty slot + // to avoid breaking the chain + m.shiftKeys(idx) + m.size-- + return true + } else if p.K == K(0) { + return false + } + + } +} + +func (m *Map[K, V]) shiftKeys(idx int) int { + // Shift entries with the same hash. + // We need to do this on deletion to ensure we don't have zeroes in the hash chain + for { + var p pair[K, V] + lastIdx := idx + idx = m.nextIndex(idx) + for { + p = m.data[idx] + if p.K == K(0) { + m.data[lastIdx] = pair[K, V]{} + return lastIdx + } + + slot := m.startIndex(p.K) + if lastIdx <= idx { + if lastIdx >= slot || slot > idx { + break + } + } else { + if lastIdx >= slot && slot > idx { + break + } + } + idx = m.nextIndex(idx) + } + m.data[lastIdx] = p + } +} + +func nextPowerOf2(x uint32) uint32 { + if x == math.MaxUint32 { + return x + } + + if x == 0 { + return 1 + } + + x-- + x |= x >> 1 + x |= x >> 2 + x |= x >> 4 + x |= x >> 8 + x |= x >> 16 + + return x + 1 +} + +func arraySize(exp int, fill float64) int { + s := nextPowerOf2(uint32(math.Ceil(float64(exp) / fill))) + if s < 2 { + s = 2 + } + return int(s) +} + +func startIndex(key, len int) int { + return phiMix64(key) & (len - 1) +} + +func nextIndex(idx, len int) int { + return (idx + 1) & (len - 1) +} diff --git a/vendor/github.com/kamstrup/intmap/set.go b/vendor/github.com/kamstrup/intmap/set.go new file mode 100644 index 0000000000..b81ce224b6 --- /dev/null +++ b/vendor/github.com/kamstrup/intmap/set.go @@ -0,0 +1,59 @@ +package intmap + +import "iter" + +// Set is a specialization of Map modelling a set of integers. +// Like Map, methods that read from the set are valid on the nil Set. +// This include Has, Len, and ForEach. +type Set[K IntKey] Map[K, struct{}] + +// NewSet creates a new Set with a given initial capacity. +func NewSet[K IntKey](capacity int) *Set[K] { + return (*Set[K])(New[K, struct{}](capacity)) +} + +// Add an element to the set. Returns true if the element was not already present. +func (s *Set[K]) Add(k K) bool { + _, found := (*Map[K, struct{}])(s).PutIfNotExists(k, struct{}{}) + return found +} + +// Del deletes a key, returning true iff the key was found +func (s *Set[K]) Del(k K) bool { + return (*Map[K, struct{}])(s).Del(k) +} + +// Clear removes all items from the Set, but keeps the internal buffers for reuse. +func (s *Set[K]) Clear() { + (*Map[K, struct{}])(s).Clear() +} + +// Has returns true if the key is in the set. +// If the set is nil this method always return false. +func (s *Set[K]) Has(k K) bool { + return (*Map[K, struct{}])(s).Has(k) +} + +// Len returns the number of elements in the set. +// If the set is nil this method return 0. +func (s *Set[K]) Len() int { + return (*Map[K, struct{}])(s).Len() +} + +// ForEach iterates over the elements in the set while the visit function returns true. +// This method returns immediately if the set is nil. +// +// The iteration order of a Set is not defined, so please avoid relying on it. +func (s *Set[K]) ForEach(visit func(k K) bool) { + (*Map[K, struct{}])(s).ForEach(func(k K, _ struct{}) bool { + return visit(k) + }) +} + +// All returns an iterator over keys from the set. +// The iterator returns immediately if the set is nil. +// +// The iteration order of a Set is not defined, so please avoid relying on it. +func (s *Set[K]) All() iter.Seq[K] { + return s.ForEach +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 91fa9c2d44..4ce4a70ba2 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -275,6 +275,9 @@ github.com/aws/smithy-go/tracing github.com/aws/smithy-go/transport/http github.com/aws/smithy-go/transport/http/internal/io github.com/aws/smithy-go/waiter +# github.com/axiomhq/hyperloglog v0.2.6 +## explicit; go 1.23 +github.com/axiomhq/hyperloglog # github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 ## explicit; go 1.20 github.com/bboreham/go-loser @@ -733,6 +736,9 @@ github.com/json-iterator/go # github.com/julienschmidt/httprouter v1.3.0 ## explicit; go 1.7 github.com/julienschmidt/httprouter +# github.com/kamstrup/intmap v0.5.2 +## explicit; go 1.23 +github.com/kamstrup/intmap # github.com/klauspost/compress v1.18.0 ## explicit; go 1.22 github.com/klauspost/compress