diff --git a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs index 0ad7aabf64954..2666ab8822ed9 100644 --- a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs +++ b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs @@ -101,6 +101,13 @@ impl InProgressSpillFile { Ok(()) } + pub fn flush(&mut self) -> Result<()> { + if let Some(writer) = &mut self.writer { + writer.flush()?; + } + Ok(()) + } + /// Returns a reference to the in-progress file, if it exists. /// This can be used to get the file path for creating readers before the file is finished. pub fn file(&self) -> Option<&RefCountedTempFile> { diff --git a/datafusion/physical-plan/src/spill/mod.rs b/datafusion/physical-plan/src/spill/mod.rs index 59938c3e8cd73..4c93c03b342eb 100644 --- a/datafusion/physical-plan/src/spill/mod.rs +++ b/datafusion/physical-plan/src/spill/mod.rs @@ -310,6 +310,11 @@ impl IPCStreamWriter { Ok((delta_num_rows, delta_num_bytes)) } + pub fn flush(&mut self) -> Result<()> { + self.writer.flush()?; + Ok(()) + } + /// Finish the writer pub fn finish(&mut self) -> Result<()> { self.writer.finish().map_err(Into::into) diff --git a/datafusion/physical-plan/src/spill/spill_manager.rs b/datafusion/physical-plan/src/spill/spill_manager.rs index 89b0276206774..6d931112ad888 100644 --- a/datafusion/physical-plan/src/spill/spill_manager.rs +++ b/datafusion/physical-plan/src/spill/spill_manager.rs @@ -188,6 +188,19 @@ impl SpillManager { Ok(spawn_buffered(stream, self.batch_read_buffer_capacity)) } + + /// Same as `read_spill_as_stream`, but without buffering. + pub fn read_spill_as_stream_unbuffered( + &self, + spill_file_path: RefCountedTempFile, + max_record_batch_memory: Option, + ) -> Result { + Ok(Box::pin(cooperative(SpillReaderStream::new( + Arc::clone(&self.schema), + spill_file_path, + max_record_batch_memory, + )))) + } } pub(crate) trait GetSlicedSize { diff --git a/datafusion/physical-plan/src/spill/spill_pool.rs b/datafusion/physical-plan/src/spill/spill_pool.rs index 8f7f5212f6c91..1b9d82eaf4506 100644 --- a/datafusion/physical-plan/src/spill/spill_pool.rs +++ b/datafusion/physical-plan/src/spill/spill_pool.rs @@ -194,6 +194,8 @@ impl SpillPoolWriter { // Append the batch if let Some(ref mut writer) = file_shared.writer { writer.append_batch(batch)?; + // make sure we flush the writer for readers + writer.flush()?; file_shared.batches_written += 1; file_shared.estimated_size += batch_size; } @@ -535,7 +537,11 @@ impl Stream for SpillFile { // Step 2: Lazy-create reader stream if needed if self.reader.is_none() && should_read { if let Some(file) = file { - match self.spill_manager.read_spill_as_stream(file, None) { + // we want this unbuffered because files are actively being written to + match self + .spill_manager + .read_spill_as_stream_unbuffered(file, None) + { Ok(stream) => { self.reader = Some(SpillFileReader { stream,