Skip to content

Extend server priorities to realtime indexing tasks for query isolation#19040

Open
abhishekrb19 wants to merge 15 commits intoapache:masterfrom
abhishekrb19:peon_server_priorities
Open

Extend server priorities to realtime indexing tasks for query isolation#19040
abhishekrb19 wants to merge 15 commits intoapache:masterfrom
abhishekrb19:peon_server_priorities

Conversation

@abhishekrb19
Copy link
Contributor

@abhishekrb19 abhishekrb19 commented Feb 20, 2026

Fixes #19018

Currently all task replicas by default answer all realtime queries regardless of the priority or routing strategy applied on the Broker.
For mixed query workloads, some queries may have higher priority than others, so isolation on the replicas is . We've seen bad queries take down some task replicas causing noisy neighbor problems.

This extends Druid’s query prioritization and routing strategies for Peon servers by letting operators configure how many replicas per server priority to configure for the tasks, similar to how it works for Historicals and Brokers. This is done by exposing serverPriorityToReplics in the supervisor's ioConfig that operators can optionally configure with the number of replicas allocated per server priority for realtime indexing tasks.

For example, some replicas can be configured to handle queries of all priorities, while others may only respond to specific priority ranges. This would isolate certain Peon replicas for certain priorities and others for more exploratory / dashboarding usecases.

Approach:

To support this, a new serverPriorityToReplicas property is added to the supervisor’s ioConfig.
The SeekableStreamSupervisor assigns priorities to SeekableStreamIndexTasks as they are created for a group. Similarly they're removed from internal bookkeeping when the tasks terminate. The ForkingTaskRunner then passes the appropriate server priority when initializing the Peon server. In the absence of this configuration, Peons continue to run with the default priority 0.

serverPriorityToReplicas is optional and is compatible with the existing replicas property if specified.

Release note

Added serverPriorityToReplicas parameter to the streaming supervisor specs (kafka/kinesis/rabbit). This allows operators to distribute task replicas across different server priorities for realtime indexing tasks. Similar to historical tiering, this enables query isolation for mixed workload scenarios on the Peons, allowing some task replicas to handle queries of specific priorities.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • been tested in a test Druid cluster.

@abhishekrb19 abhishekrb19 force-pushed the peon_server_priorities branch from 8e47ed3 to eed82ea Compare February 20, 2026 23:27
@abhishekrb19 abhishekrb19 changed the title Extend server priorities to Peon servers Extend server priorities to realtime indexing tasks for query isolation Feb 21, 2026
@abhishekrb19 abhishekrb19 force-pushed the peon_server_priorities branch from 85b9135 to 6a42a18 Compare February 23, 2026 04:46
@abhishekrb19 abhishekrb19 force-pushed the peon_server_priorities branch from 7829e01 to ad0abca Compare February 23, 2026 07:44
EasyMock.reset(spec);
EasyMock.expect(spec.getId()).andReturn(SUPERVISOR_ID).anyTimes();
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note test

Invoking
SeekableStreamSupervisorSpec.getDataSchema
should be avoided because it has been deprecated.
@jtuglu1 jtuglu1 self-requested a review February 24, 2026 06:32
EasyMock.reset(spec);
EasyMock.expect(spec.getId()).andReturn(SUPERVISOR_ID).anyTimes();
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note test

Invoking
SeekableStreamSupervisorSpec.getDataSchema
should be avoided because it has been deprecated.

@Nullable
@JsonProperty
public Map<Integer, Integer> getserverPriorityToReplicas()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public Map<Integer, Integer> getserverPriorityToReplicas()
public Map<Integer, Integer> getServerPriorityToReplicas()

if (this.serverPriorityToReplicas != null) {
final int replicaCount = this.serverPriorityToReplicas.values().stream().mapToInt(Integer::intValue).sum();
if (replicas != null && replicas != replicaCount) {
throw InvalidInput.exception(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about dropping this requirement? Seems like additional configuration overhead since we set it explicitly below anyway

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be triggered only if someone unintentionally sets conflicting values that cause ambiguity. I wanted to keep the behavior straightforward for end users rather than document any precedence rules in such cases. I've updated the error message accordingly, please let me know if that makes sense.

Copy link
Contributor

@jtuglu1 jtuglu1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a first pass – general approach LGTM.

|`lateMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps earlier than this period before the task was created. For example, if this property is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than `2016-01-01T11:00Z`. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a streaming and a nightly batch ingestion pipeline. You can specify only one of the late message rejection properties.|No||
|`earlyMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps later than this period after the task reached its task duration. For example, if this property is set to `PT1H`, the task duration is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps later than `2016-01-01T14:00Z`. Tasks sometimes run past their task duration, such as in cases of supervisor failover.|No||
|`stopTaskCount`|Integer|Limits the number of ingestion tasks Druid can cycle at any given time. If not set, Druid can cycle all tasks at the same time. If set to a value less than `taskCount`, your cluster needs fewer available slots to run the supervisor. You can save costs by scaling down your ingestion tier, but this can lead to slower cycle times and lag. See [`stopTaskCount`](#stoptaskcount) for more information.|No|`taskCount` value|
|`serverPriorityToReplicas`|Object (`Map<Integer, Integer>`)|Map of server priorities to the number of replicas per priority. When set, each task replica is assigned a server priority that corresponds to `druid.server.priority` on the Peon process to enable query isolation for mixed workloads using [query routing strategies](../configuration/index.md#query-routing). If not configured, the `replicas` setting applies and all task replicas are assigned a default priority of 0.<br/><br/>For example, setting `serverPriorityToReplicas` to `{"1": 2, "0": 1}` creates 2 task replicas with `druid.server.priority=1` and 1 task replica with `druid.server.priority=0` per task group. This configuration scales proportionally with `taskCount`. For example, if `taskCount` is set to 5, this results in 15 total task replicas - 10 replicas with priority 1 and 5 replicas with priority 0.|No|null|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
|`serverPriorityToReplicas`|Object (`Map<Integer, Integer>`)|Map of server priorities to the number of replicas per priority. When set, each task replica is assigned a server priority that corresponds to `druid.server.priority` on the Peon process to enable query isolation for mixed workloads using [query routing strategies](../configuration/index.md#query-routing). If not configured, the `replicas` setting applies and all task replicas are assigned a default priority of 0.<br/><br/>For example, setting `serverPriorityToReplicas` to `{"1": 2, "0": 1}` creates 2 task replicas with `druid.server.priority=1` and 1 task replica with `druid.server.priority=0` per task group. This configuration scales proportionally with `taskCount`. For example, if `taskCount` is set to 5, this results in 15 total task replicas - 10 replicas with priority 1 and 5 replicas with priority 0.|No|null|
|`serverPriorityToReplicas`|Object (`Map<Integer, Integer>`)|Map of server priorities to the number of replicas per priority. When set, each task replica is assigned a server priority that corresponds to `druid.server.priority` on the Peon process to enable query isolation for mixed workloads using [query routing strategies](../configuration/index.md#query-routing). If not configured, the `replicas` setting applies and all task replicas are assigned a default priority of 0.<br/><br/>For example, setting `serverPriorityToReplicas` to `{"1": 2, "0": 1}` creates 2 task replicas with `druid.server.priority=1` and 1 task replica with `druid.server.priority=0` per task group. This configuration scales proportionally with `taskCount`. For example, if `taskCount` is set to 5, this results in 15 total tasks - 10 tasks with priority 1 and 5 tasks with priority 0.|No|null|

I thought using the word replicas here was a bit confusing since we're referring to task counts here

this.idleConfig = idleConfig;
this.serverPriorityToReplicas = serverPriorityToReplicas;
if (this.serverPriorityToReplicas != null) {
final int serverPriorityReplicas = this.serverPriorityToReplicas.values().stream().mapToInt(Integer::intValue).sum();
Copy link
Contributor

@aho135 aho135 Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like this would also pass validation

Map.of(
  1, 2,
  0, -1
)

Maybe make this more strict such that any value has to be > 0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Extend server priorities to realtime indexing tasks for query isolation

3 participants