Skip to content

Fix Arrow Spill Underrun#20159

Merged
adriangb merged 1 commit intoapache:mainfrom
pydantic:fix_spill_read_underrun
Feb 6, 2026
Merged

Fix Arrow Spill Underrun#20159
adriangb merged 1 commit intoapache:mainfrom
pydantic:fix_spill_read_underrun

Conversation

@cetra3
Copy link
Contributor

@cetra3 cetra3 commented Feb 5, 2026

Which issue does this PR close?

Rationale for this change

This adjusts the way that the spill channel works. Currently we have a spill writer & reader pairing which uses a mutex to coordindate when a file is ready to be read.

What happens is, that because we were using a spawn_buffered call, the read task would race ahead trying to read a file which is yet to be written out completely.

Alongside this, we need to flush each write to the file, as there is a chance that another thread may see stale data.

What changes are included in this PR?

Adds a flush on write, and converts the read task to not buffer reads.

Are these changes tested?

I haven't written a test, but I have been running the example in the attached issue. While it now fails with allocation errors, the original error goes away.

Are there any user-facing changes?

Nope

@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label Feb 5, 2026
@alamb
Copy link
Contributor

alamb commented Feb 5, 2026

I tried the reproducer and now it fails like this

./target/profiling/datafusion-cli  -m 1G -c "SELECT \"UserID\", extract(minute FROM to_timestamp_seconds(\"EventTime\")) AS m, \"SearchPhrase\", COUNT(*) FROM '/Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned' GROUP BY \"UserID\", m, \"SearchPhrase\" ORDER BY COUNT(*) DESC LIMIT 10;"
DataFusion CLI v52.1.0
Error: Execution error: (Hint: you may increase the file descriptor limit with shell command 'ulimit -n 4096') Failed to create partition file at "/var/folders/1l/tg68jc6550gg8xqf1hr4mlwr0000gn/T/.tmp2YLiiK/.tmp8w9Wx2": Os { code: 24, kind: Uncategorized, message: "Too many open files" }

However, when I cranked the ulimit via

ulimit -n 10000

Then it fails like this

3:~/Software/datafusion$ ./target/profiling/datafusion-cli  -m 1G -c "SELECT \"UserID\", extract(minute FROM to_timestamp_seconds(\"EventTime\")) AS m, \"SearchPhrase\", COUNT(*) FROM '/Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned' GROUP BY \"UserID\", m, \"SearchPhrase\" ORDER BY COUNT(*) DESC LIMIT 10;"
DataFusion CLI v52.1.0
Error: Resources exhausted: Additional allocation failed for TopK[7] with top memory consumers (across reservations) as:
  GroupedHashAggregateStream[7] (count(1))#48(can spill: true) consumed 83.8 MB, peak 83.8 MB,
  GroupedHashAggregateStream[4] (count(1))#26(can spill: true) consumed 83.8 MB, peak 83.8 MB,
  GroupedHashAggregateStream[15] (count(1))#64(can spill: true) consumed 79.1 MB, peak 79.1 MB.
Error: Failed to allocate additional 2.3 MB for TopK[7] with 7.4 MB already allocated for this reservation - 334.3 KB remain available for the total pool

That being said, this seems like an improvement to me

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @cetra3

FYI @2010YOUY01 as. you may be interested in this fix

@alamb alamb requested a review from adriangb February 5, 2026 22:44
// Append the batch
if let Some(ref mut writer) = file_shared.writer {
writer.append_batch(batch)?;
// make sure we flush the writer for readers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do wonder if it is ok to flush each batch (will this result in too many small file IOs?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to do this, or we need to ensure data is written before we try reading it, maybe we could do it periodically rather than each batch

@alamb
Copy link
Contributor

alamb commented Feb 5, 2026

I got it to finish!

andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion$ ./target/profiling/datafusion-cli  -m 1G -c "SELECT \"UserID\", extract(minute FROM to_timestamp_seconds(\"EventTime\")) AS m, \"SearchPhrase\", COUNT(*) FROM '/Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned' GROUP BY \"UserID\", m, \"SearchPhrase\" ORDER BY COUNT(*) DESC LIMIT 10;"
DataFusion CLI v52.1.0
+---------------------+----+--------------+----------+
| UserID              | m  | SearchPhrase | count(*) |
+---------------------+----+--------------+----------+
| 1313338681122956954 | 31 |              | 589      |
| 1313338681122956954 | 28 |              | 578      |
| 1313338681122956954 | 29 |              | 572      |
| 1313338681122956954 | 33 |              | 567      |
| 1313338681122956954 | 27 |              | 557      |
| 1313338681122956954 | 32 |              | 554      |
| 1313338681122956954 | 30 |              | 552      |
| 1313338681122956954 | 34 |              | 546      |
| 1313338681122956954 | 26 |              | 540      |
| 1313338681122956954 | 10 |              | 539      |
+---------------------+----+--------------+----------+
10 row(s) fetched.
Elapsed 233.198 seconds.

That is a crazy long time though

For comparison, without a memory limit it takes 1s

andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion$ ./target/profiling/datafusion-cli  -c "SELECT \"UserID\", extract(minute FROM to_timestamp_seconds(\"EventTime\")) AS m, \"SearchPhrase\", COUNT(*) FROM '/Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned' GROUP BY \"UserID\", m, \"SearchPhrase\" ORDER BY COUNT(*) DESC LIMIT 10;"
DataFusion CLI v52.1.0
+---------------------+----+--------------+----------+
| UserID              | m  | SearchPhrase | count(*) |
+---------------------+----+--------------+----------+
| 1313338681122956954 | 31 |              | 589      |
| 1313338681122956954 | 28 |              | 578      |
| 1313338681122956954 | 29 |              | 572      |
| 1313338681122956954 | 33 |              | 567      |
| 1313338681122956954 | 27 |              | 557      |
| 1313338681122956954 | 32 |              | 554      |
| 1313338681122956954 | 30 |              | 552      |
| 1313338681122956954 | 34 |              | 546      |
| 1313338681122956954 | 26 |              | 540      |
| 1313338681122956954 | 10 |              | 539      |
+---------------------+----+--------------+----------+
10 row(s) fetched.
Elapsed 1.330 seconds.

@adriangb
Copy link
Contributor

adriangb commented Feb 5, 2026

I’d guess a file IO per batch is okay. A file per batch probably not that’s what all of this complexity tries to avoid). But it would be interesting to (1) confirm this PR doesn’t make other spilling queries slower and (2) investigate why the spilling version is so slow (can be done outside of this PR)

@adriangb
Copy link
Contributor

adriangb commented Feb 6, 2026

@alamb Wdyt of adding a benchmark that forces spilling? Or an option to the current script to run with a memory limit?

Adjust formatting
@cetra3 cetra3 force-pushed the fix_spill_read_underrun branch from 3314b1c to 572cd33 Compare February 6, 2026 00:50
Copy link
Contributor

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is good to merge as is since it fixes a real bug. But it would be great if we could see some benchmarks. I discussed with @cetra3 hes going to run them

@cetra3
Copy link
Contributor Author

cetra3 commented Feb 6, 2026

OK I have tried to get some benchmark results out using disk spilling and the tpch benchmark. Setting the memory limit to 1G I can successfully run this on this branch all the time. On main it fails a lot with the arrow error but managed to get it to work once or twice. However it just looks like it's Query 21 where any spilling happens.

On this branch:

Query 21 iteration 0 took 1596.5 ms and returned 100 rows
Query 21 iteration 1 took 2269.5 ms and returned 100 rows
Query 21 iteration 2 took 2515.2 ms and returned 100 rows
Query 21 iteration 3 took 2907.9 ms and returned 100 rows
Query 21 iteration 4 took 3504.3 ms and returned 100 rows
Query 21 avg time: 2558.70 ms

On main the best I got was:

Query 21 iteration 0 took 3121.8 ms and returned 100 rows
Query 21 iteration 1 took 2731.7 ms and returned 100 rows
Query 21 iteration 2 took 3314.0 ms and returned 100 rows
Query 21 iteration 3 took 4073.1 ms and returned 100 rows
Query 21 iteration 4 took 5101.1 ms and returned 100 rows
Query 21 avg time: 3668.34 ms

I've attached the JSON of both runs:

main.json
fix_spill_read_underrun.json

@xudong963
Copy link
Member

I haven't written a test, but I have been running the example in the attached issue. While it now fails with allocation errors, the original error goes away.

Call for a test, might extract from the example

@cetra3
Copy link
Contributor Author

cetra3 commented Feb 6, 2026

Here are the results from the clickbench benchmark. With 1GB memory limit, however we reach the resource limits but the bug doesn't materialize with the change:

fix_spill_read_underrun.log
main.log
clickbench_partitioned.json - fix
clickbench_partitioned.json - main

@adriangb
Copy link
Contributor

adriangb commented Feb 6, 2026

Here are the results from the clickbench benchmark. With 1GB memory limit, however we reach the resource limits but the bug doesn't materialize with the change:

fix_spill_read_underrun.log main.log clickbench_partitioned.json - fix clickbench_partitioned.json - main

Here's the comparison:

┏━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query     ┃ spill-main ┃      spill ┃        Change ┃
┡━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0  │    1.42 ms │    1.29 ms │ +1.10x faster │
│ QQuery 1  │   12.26 ms │   10.22 ms │ +1.20x faster │
│ QQuery 2  │   24.56 ms │   21.90 ms │ +1.12x faster │
│ QQuery 3  │   54.30 ms │   52.67 ms │     no change │
│ QQuery 4  │  589.73 ms │  544.75 ms │ +1.08x faster │
│ QQuery 5  │       FAIL │       FAIL │  incomparable │
│ QQuery 6  │   22.91 ms │    8.33 ms │ +2.75x faster │
│ QQuery 7  │   13.95 ms │   38.71 ms │  2.78x slower │
│ QQuery 8  │ 1100.09 ms │ 1089.94 ms │     no change │
│ QQuery 9  │  476.05 ms │  443.06 ms │ +1.07x faster │
│ QQuery 10 │  124.00 ms │  115.41 ms │ +1.07x faster │
│ QQuery 11 │  134.52 ms │  122.89 ms │ +1.09x faster │
│ QQuery 12 │       FAIL │       FAIL │  incomparable │
│ QQuery 13 │       FAIL │       FAIL │  incomparable │
│ QQuery 14 │       FAIL │       FAIL │  incomparable │
│ QQuery 15 │ 1013.17 ms │ 1038.87 ms │     no change │
│ QQuery 16 │       FAIL │       FAIL │  incomparable │
│ QQuery 17 │       FAIL │       FAIL │  incomparable │
│ QQuery 18 │       FAIL │       FAIL │  incomparable │
│ QQuery 19 │  136.08 ms │   88.19 ms │ +1.54x faster │
│ QQuery 20 │  439.50 ms │ 1187.21 ms │  2.70x slower │
│ QQuery 21 │  608.01 ms │  765.89 ms │  1.26x slower │
│ QQuery 22 │ 1248.89 ms │ 1256.74 ms │     no change │
│ QQuery 23 │ 6601.82 ms │ 4627.75 ms │ +1.43x faster │
│ QQuery 24 │   68.26 ms │   78.68 ms │  1.15x slower │
│ QQuery 25 │  121.97 ms │  100.81 ms │ +1.21x faster │
│ QQuery 26 │   85.14 ms │   77.47 ms │ +1.10x faster │
│ QQuery 27 │  615.13 ms │  524.61 ms │ +1.17x faster │
│ QQuery 28 │       FAIL │       FAIL │  incomparable │
│ QQuery 29 │  223.85 ms │  201.93 ms │ +1.11x faster │
│ QQuery 30 │  539.42 ms │  495.41 ms │ +1.09x faster │
│ QQuery 31 │ 1123.87 ms │ 1016.04 ms │ +1.11x faster │
│ QQuery 32 │ 5227.16 ms │ 5989.61 ms │  1.15x slower │
│ QQuery 33 │       FAIL │       FAIL │  incomparable │
│ QQuery 34 │       FAIL │       FAIL │  incomparable │
│ QQuery 35 │  999.16 ms │  991.90 ms │     no change │
│ QQuery 36 │       FAIL │  365.41 ms │  incomparable │
│ QQuery 37 │   37.26 ms │   38.01 ms │     no change │
│ QQuery 38 │   58.04 ms │   59.82 ms │     no change │
│ QQuery 39 │  297.38 ms │  562.74 ms │  1.89x slower │
│ QQuery 40 │   19.25 ms │   20.12 ms │     no change │
│ QQuery 41 │   18.30 ms │   17.35 ms │ +1.05x faster │
│ QQuery 42 │   16.78 ms │   14.57 ms │ +1.15x faster │
└───────────┴────────────┴────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary         ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (spill-main)   │ 22052.23ms │
│ Total Time (spill)        │ 21602.90ms │
│ Average Time (spill-main) │   689.13ms │
│ Average Time (spill)      │   675.09ms │
│ Queries Faster            │         18 │
│ Queries Slower            │          6 │
│ Queries with No Change    │          8 │
│ Queries with Failure      │         11 │
└───────────────────────────┴────────────┘

This looks good to me, there's some slowdowns but overwhelmingly speedups and fails -> not fails.

@adriangb adriangb added this pull request to the merge queue Feb 6, 2026
Merged via the queue into apache:main with commit 1799c31 Feb 6, 2026
32 checks passed
@adriangb adriangb deleted the fix_spill_read_underrun branch February 6, 2026 11:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Running Clickbench query 18 when spilling fails with "failed to fill whole buffer" error

4 participants