Extend server priorities to realtime indexing tasks for query isolation#19040
Extend server priorities to realtime indexing tasks for query isolation#19040abhishekrb19 wants to merge 15 commits intoapache:masterfrom
Conversation
.../main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
Dismissed
Show dismissed
Hide dismissed
8e47ed3 to
eed82ea
Compare
85b9135 to
6a42a18
Compare
7829e01 to
ad0abca
Compare
| EasyMock.reset(spec); | ||
| EasyMock.expect(spec.getId()).andReturn(SUPERVISOR_ID).anyTimes(); | ||
| EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); | ||
| EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation Note test
| EasyMock.reset(spec); | ||
| EasyMock.expect(spec.getId()).andReturn(SUPERVISOR_ID).anyTimes(); | ||
| EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); | ||
| EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation Note test
|
|
||
| @Nullable | ||
| @JsonProperty | ||
| public Map<Integer, Integer> getserverPriorityToReplicas() |
There was a problem hiding this comment.
| public Map<Integer, Integer> getserverPriorityToReplicas() | |
| public Map<Integer, Integer> getServerPriorityToReplicas() |
| if (this.serverPriorityToReplicas != null) { | ||
| final int replicaCount = this.serverPriorityToReplicas.values().stream().mapToInt(Integer::intValue).sum(); | ||
| if (replicas != null && replicas != replicaCount) { | ||
| throw InvalidInput.exception( |
There was a problem hiding this comment.
What do you think about dropping this requirement? Seems like additional configuration overhead since we set it explicitly below anyway
There was a problem hiding this comment.
This would be triggered only if someone unintentionally sets conflicting values that cause ambiguity. I wanted to keep the behavior straightforward for end users rather than document any precedence rules in such cases. I've updated the error message accordingly, please let me know if that makes sense.
jtuglu1
left a comment
There was a problem hiding this comment.
Did a first pass – general approach LGTM.
...xing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java
Show resolved
Hide resolved
...va/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
Outdated
Show resolved
Hide resolved
...va/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
Outdated
Show resolved
Hide resolved
...va/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
Show resolved
Hide resolved
| |`lateMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps earlier than this period before the task was created. For example, if this property is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than `2016-01-01T11:00Z`. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a streaming and a nightly batch ingestion pipeline. You can specify only one of the late message rejection properties.|No|| | ||
| |`earlyMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps later than this period after the task reached its task duration. For example, if this property is set to `PT1H`, the task duration is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps later than `2016-01-01T14:00Z`. Tasks sometimes run past their task duration, such as in cases of supervisor failover.|No|| | ||
| |`stopTaskCount`|Integer|Limits the number of ingestion tasks Druid can cycle at any given time. If not set, Druid can cycle all tasks at the same time. If set to a value less than `taskCount`, your cluster needs fewer available slots to run the supervisor. You can save costs by scaling down your ingestion tier, but this can lead to slower cycle times and lag. See [`stopTaskCount`](#stoptaskcount) for more information.|No|`taskCount` value| | ||
| |`serverPriorityToReplicas`|Object (`Map<Integer, Integer>`)|Map of server priorities to the number of replicas per priority. When set, each task replica is assigned a server priority that corresponds to `druid.server.priority` on the Peon process to enable query isolation for mixed workloads using [query routing strategies](../configuration/index.md#query-routing). If not configured, the `replicas` setting applies and all task replicas are assigned a default priority of 0.<br/><br/>For example, setting `serverPriorityToReplicas` to `{"1": 2, "0": 1}` creates 2 task replicas with `druid.server.priority=1` and 1 task replica with `druid.server.priority=0` per task group. This configuration scales proportionally with `taskCount`. For example, if `taskCount` is set to 5, this results in 15 total task replicas - 10 replicas with priority 1 and 5 replicas with priority 0.|No|null| |
There was a problem hiding this comment.
| |`serverPriorityToReplicas`|Object (`Map<Integer, Integer>`)|Map of server priorities to the number of replicas per priority. When set, each task replica is assigned a server priority that corresponds to `druid.server.priority` on the Peon process to enable query isolation for mixed workloads using [query routing strategies](../configuration/index.md#query-routing). If not configured, the `replicas` setting applies and all task replicas are assigned a default priority of 0.<br/><br/>For example, setting `serverPriorityToReplicas` to `{"1": 2, "0": 1}` creates 2 task replicas with `druid.server.priority=1` and 1 task replica with `druid.server.priority=0` per task group. This configuration scales proportionally with `taskCount`. For example, if `taskCount` is set to 5, this results in 15 total task replicas - 10 replicas with priority 1 and 5 replicas with priority 0.|No|null| | |
| |`serverPriorityToReplicas`|Object (`Map<Integer, Integer>`)|Map of server priorities to the number of replicas per priority. When set, each task replica is assigned a server priority that corresponds to `druid.server.priority` on the Peon process to enable query isolation for mixed workloads using [query routing strategies](../configuration/index.md#query-routing). If not configured, the `replicas` setting applies and all task replicas are assigned a default priority of 0.<br/><br/>For example, setting `serverPriorityToReplicas` to `{"1": 2, "0": 1}` creates 2 task replicas with `druid.server.priority=1` and 1 task replica with `druid.server.priority=0` per task group. This configuration scales proportionally with `taskCount`. For example, if `taskCount` is set to 5, this results in 15 total tasks - 10 tasks with priority 1 and 5 tasks with priority 0.|No|null| |
I thought using the word replicas here was a bit confusing since we're referring to task counts here
| this.idleConfig = idleConfig; | ||
| this.serverPriorityToReplicas = serverPriorityToReplicas; | ||
| if (this.serverPriorityToReplicas != null) { | ||
| final int serverPriorityReplicas = this.serverPriorityToReplicas.values().stream().mapToInt(Integer::intValue).sum(); |
There was a problem hiding this comment.
Something like this would also pass validation
Map.of(
1, 2,
0, -1
)
Maybe make this more strict such that any value has to be > 0
Fixes #19018
Currently all task replicas by default answer all realtime queries regardless of the priority or routing strategy applied on the Broker.
For mixed query workloads, some queries may have higher priority than others, so isolation on the replicas is . We've seen bad queries take down some task replicas causing noisy neighbor problems.
This extends Druid’s query prioritization and routing strategies for Peon servers by letting operators configure how many replicas per server priority to configure for the tasks, similar to how it works for Historicals and Brokers. This is done by exposing
serverPriorityToReplicsin the supervisor'sioConfigthat operators can optionally configure with the number of replicas allocated per server priority for realtime indexing tasks.For example, some replicas can be configured to handle queries of all priorities, while others may only respond to specific priority ranges. This would isolate certain Peon replicas for certain priorities and others for more exploratory / dashboarding usecases.
Approach:
To support this, a new
serverPriorityToReplicasproperty is added to the supervisor’sioConfig.The
SeekableStreamSupervisorassigns priorities toSeekableStreamIndexTasks as they are created for a group. Similarly they're removed from internal bookkeeping when the tasks terminate. TheForkingTaskRunnerthen passes the appropriate server priority when initializing the Peon server. In the absence of this configuration, Peons continue to run with the default priority 0.serverPriorityToReplicasis optional and is compatible with the existingreplicasproperty if specified.Release note
Added
serverPriorityToReplicasparameter to the streaming supervisor specs (kafka/kinesis/rabbit). This allows operators to distribute task replicas across different server priorities for realtime indexing tasks. Similar to historical tiering, this enables query isolation for mixed workload scenarios on the Peons, allowing some task replicas to handle queries of specific priorities.This PR has: