diff --git a/native-engine/auron-memmgr/src/spill.rs b/native-engine/auron-memmgr/src/spill.rs index 0ed193839..ada682264 100644 --- a/native-engine/auron-memmgr/src/spill.rs +++ b/native-engine/auron-memmgr/src/spill.rs @@ -166,10 +166,10 @@ impl Drop for FileSpill { self.1 .disk_spill_iotime .add_duration(Duration::from_nanos(self.1.mem_spill_iotime.value() as u64)); - if let Some(file_path) = &self.2 { - if let Err(e) = fs::remove_file(file_path) { - warn!("Was unable to delete spill file: {file_path}. error: {e}"); - } + if let Some(file_path) = &self.2 + && let Err(e) = fs::remove_file(file_path) + { + warn!("Was unable to delete spill file: {file_path}. error: {e}"); } } } diff --git a/native-engine/datafusion-ext-commons/src/hash/xxhash.rs b/native-engine/datafusion-ext-commons/src/hash/xxhash.rs index 31bbab07f..d6acd7d9a 100644 --- a/native-engine/datafusion-ext-commons/src/hash/xxhash.rs +++ b/native-engine/datafusion-ext-commons/src/hash/xxhash.rs @@ -98,7 +98,7 @@ fn xxh64_merge_round(mut hash: u64, acc: u64) -> u64 { #[inline] fn xxh_rotl64(value: u64, amt: i32) -> u64 { - (value << (amt % 64)) | (value >> (64 - amt % 64)) + value.rotate_left((amt as u32) % 64) } #[inline] diff --git a/native-engine/datafusion-ext-commons/src/io/batch_serde.rs b/native-engine/datafusion-ext-commons/src/io/batch_serde.rs index 2de3c62cd..1b96c30be 100644 --- a/native-engine/datafusion-ext-commons/src/io/batch_serde.rs +++ b/native-engine/datafusion-ext-commons/src/io/batch_serde.rs @@ -750,7 +750,7 @@ mod test { "| [6, 7] | [6, 7] |", "+-----------+-----------+" ], - &[batch.clone()] + std::slice::from_ref(&batch) ); // test read after write diff --git a/native-engine/datafusion-ext-commons/src/io/ipc_compression.rs b/native-engine/datafusion-ext-commons/src/io/ipc_compression.rs index c774c7453..ee6d29b47 100644 --- a/native-engine/datafusion-ext-commons/src/io/ipc_compression.rs +++ b/native-engine/datafusion-ext-commons/src/io/ipc_compression.rs @@ -336,8 +336,8 @@ mod tests { let test_array2: ArrayRef = Arc::new(StringArray::from(vec![Some("foo"), Some("bar")])); let schema = Arc::new(Schema::new(vec![Field::new("", DataType::Utf8, false)])); - writer.write_batch(2, &[test_array1.clone()])?; - writer.write_batch(2, &[test_array2.clone()])?; + writer.write_batch(2, std::slice::from_ref(&test_array1))?; + writer.write_batch(2, std::slice::from_ref(&test_array2))?; writer.finish_current_buf()?; let mut reader = IpcCompressionReader::new(Cursor::new(buf)); diff --git a/native-engine/datafusion-ext-commons/src/lib.rs b/native-engine/datafusion-ext-commons/src/lib.rs index 8491c379d..fb94bc2de 100644 --- a/native-engine/datafusion-ext-commons/src/lib.rs +++ b/native-engine/datafusion-ext-commons/src/lib.rs @@ -16,7 +16,6 @@ #![allow(internal_features)] #![feature(core_intrinsics)] #![feature(slice_swap_unchecked)] -#![feature(vec_into_raw_parts)] use auron_jni_bridge::conf::{ BATCH_SIZE, IntConf, SUGGESTED_BATCH_MEM_SIZE, SUGGESTED_BATCH_MEM_SIZE_KWAY_MERGE, @@ -141,10 +140,10 @@ macro_rules! assume { macro_rules! prefetch_read_data { ($e:expr) => {{ // safety: use prefetch - let locality = 3; + const LOCALITY: i32 = 3; #[allow(unused_unsafe)] unsafe { - std::intrinsics::prefetch_read_data($e, locality) + std::intrinsics::prefetch_read_data::<_, { LOCALITY }>($e) } }}; } @@ -152,10 +151,10 @@ macro_rules! prefetch_read_data { macro_rules! prefetch_write_data { ($e:expr) => {{ // safety: use prefetch - let locality = 3; + const LOCALITY: i32 = 3; #[allow(unused_unsafe)] unsafe { - std::intrinsics::prefetch_write_data($e, locality) + std::intrinsics::prefetch_write_data::<_, { LOCALITY }>($e) } }}; } diff --git a/native-engine/datafusion-ext-commons/src/spark_hash.rs b/native-engine/datafusion-ext-commons/src/spark_hash.rs index 097ae2b24..a1439e808 100644 --- a/native-engine/datafusion-ext-commons/src/spark_hash.rs +++ b/native-engine/datafusion-ext-commons/src/spark_hash.rs @@ -467,7 +467,7 @@ mod tests { ])) as ArrayRef; // generated with Murmur3Hash(Seq(Literal(1L)), 42).eval() since Spark is tested - let hashes = create_murmur3_hashes(5, &[i.clone()], 42); + let hashes = create_murmur3_hashes(5, std::slice::from_ref(&i), 42); let expected: Vec = [ 0x99f0149d_u32, 0x9c67b85d, @@ -482,7 +482,7 @@ mod tests { // generated with XxHash64(Seq(Literal(1L)), 42).eval() since Spark is tested // against this as well - let hashes = create_xxhash64_hashes(5, &[i.clone()], 42); + let hashes = create_xxhash64_hashes(5, std::slice::from_ref(&i), 42); let expected = vec![ -7001672635703045582, -5252525462095825812, @@ -495,11 +495,11 @@ mod tests { #[test] fn test_str() { - let i = Arc::new(StringArray::from(vec!["hello", "bar", "", "😁", "天地"])); + let i = Arc::new(StringArray::from(vec!["hello", "bar", "", "😁", "天地"])) as ArrayRef; // generated with Murmur3Hash(Seq(Literal("")), 42).eval() since Spark is tested // against this as well - let hashes = create_murmur3_hashes(5, &[i.clone()], 42); + let hashes = create_murmur3_hashes(5, std::slice::from_ref(&i), 42); let expected: Vec = [3286402344_u32, 2486176763, 142593372, 885025535, 2395000894] .into_iter() .map(|v| v as i32) @@ -508,7 +508,7 @@ mod tests { // generated with XxHash64(Seq(Literal("")), 42).eval() since Spark is tested // against this as well - let hashes = create_xxhash64_hashes(5, &[i.clone()], 42); + let hashes = create_xxhash64_hashes(5, std::slice::from_ref(&i), 42); let expected = vec![ -4367754540140381902, -1798770879548125814, @@ -541,7 +541,7 @@ mod tests { let array_ref = Arc::new(list_array) as ArrayRef; // Test Murmur3 hash - let hashes = create_murmur3_hashes(3, &[array_ref.clone()], 42); + let hashes = create_murmur3_hashes(3, std::slice::from_ref(&array_ref), 42); assert_eq!(hashes, vec![-222940379, -374492525, -331964951]); Ok(()) } diff --git a/native-engine/datafusion-ext-functions/src/spark_dates.rs b/native-engine/datafusion-ext-functions/src/spark_dates.rs index 800b0e530..ef6f9d32a 100644 --- a/native-engine/datafusion-ext-functions/src/spark_dates.rs +++ b/native-engine/datafusion-ext-functions/src/spark_dates.rs @@ -64,11 +64,8 @@ pub fn spark_quarter(args: &[ColumnarValue]) -> Result { .expect("date_part(Month) must return Int32Array"); // Compute quarter: ((month - 1) / 3) + 1, preserving NULLs - let quarter = Int32Array::from_iter( - month_arr - .iter() - .map(|opt_m| opt_m.map(|m| ((m - 1) / 3 + 1))), - ); + let quarter = + Int32Array::from_iter(month_arr.iter().map(|opt_m| opt_m.map(|m| (m - 1) / 3 + 1))); Ok(ColumnarValue::Array(Arc::new(quarter))) } diff --git a/native-engine/datafusion-ext-functions/src/spark_get_json_object.rs b/native-engine/datafusion-ext-functions/src/spark_get_json_object.rs index 3af47fd34..70a1be76a 100644 --- a/native-engine/datafusion-ext-functions/src/spark_get_json_object.rs +++ b/native-engine/datafusion-ext-functions/src/spark_get_json_object.rs @@ -305,15 +305,15 @@ impl HiveGetJsonObjectEvaluator { json_str: &str, ) -> std::result::Result, HiveGetJsonObjectError> { // first try parsing with sonic-rs and fail-backing to serde-json - if let Ok(root_value) = sonic_rs::from_str::(json_str) { - if let Ok(v) = self.evaluate_with_value_sonic(&root_value) { - return Ok(v); - } + if let Ok(root_value) = sonic_rs::from_str::(json_str) + && let Ok(v) = self.evaluate_with_value_sonic(&root_value) + { + return Ok(v); } - if let Ok(root_value) = serde_json::from_str::(json_str) { - if let Ok(v) = self.evaluate_with_value_serde_json(&root_value) { - return Ok(v); - } + if let Ok(root_value) = serde_json::from_str::(json_str) + && let Ok(v) = self.evaluate_with_value_serde_json(&root_value) + { + return Ok(v); } Err(HiveGetJsonObjectError::InvalidInput) } diff --git a/native-engine/datafusion-ext-functions/src/spark_strings.rs b/native-engine/datafusion-ext-functions/src/spark_strings.rs index 43b1f136f..cb10924fa 100644 --- a/native-engine/datafusion-ext-functions/src/spark_strings.rs +++ b/native-engine/datafusion-ext-functions/src/spark_strings.rs @@ -216,10 +216,10 @@ pub fn string_concat_ws(args: &[ColumnarValue]) -> Result { if let Ok(s) = as_string_array(&array).cloned() { return Ok(Arg::Array(s)); } - if let Ok(l) = as_list_array(&array).cloned() { - if l.value_type() == DataType::Utf8 { - return Ok(Arg::List(l)); - } + if let Ok(l) = as_list_array(&array).cloned() + && l.value_type() == DataType::Utf8 + { + return Ok(Arg::List(l)); } } ColumnarValue::Scalar(scalar) => { diff --git a/native-engine/datafusion-ext-plans/src/agg/acc.rs b/native-engine/datafusion-ext-plans/src/agg/acc.rs index bea1c05ec..dad973dbe 100644 --- a/native-engine/datafusion-ext-plans/src/agg/acc.rs +++ b/native-engine/datafusion-ext-plans/src/agg/acc.rs @@ -456,10 +456,10 @@ impl AccBytesColumn { fn refresh_heap_mem_used(&mut self) { self.heap_mem_used = 0; for item in &self.items { - if let Some(v) = item { - if v.spilled() { - self.heap_mem_used += v.capacity(); - } + if let Some(v) = item + && v.spilled() + { + self.heap_mem_used += v.capacity(); } } } diff --git a/native-engine/datafusion-ext-plans/src/agg/agg_table.rs b/native-engine/datafusion-ext-plans/src/agg/agg_table.rs index 5f15b3e67..6a840556a 100644 --- a/native-engine/datafusion-ext-plans/src/agg/agg_table.rs +++ b/native-engine/datafusion-ext-plans/src/agg/agg_table.rs @@ -126,10 +126,10 @@ impl AggTable { self.update_mem_used(mem_used).await?; // update udaf memory tracker, spill if update failed - if let Some(udaf_mem_tracker) = self.agg_ctx.get_udaf_mem_tracker() { - if !udaf_mem_tracker.update_used()? { - self.force_spill().await?; - } + if let Some(udaf_mem_tracker) = self.agg_ctx.get_udaf_mem_tracker() + && !udaf_mem_tracker.update_used()? + { + self.force_spill().await?; } Ok(()) } @@ -329,10 +329,10 @@ impl MemConsumer for AggTable { // use pre-merging if cardinality is low let mut next_is_hashing = false; - if let InMemData::Hashing(hashing_data) = &in_mem.data { - if hashing_data.cardinality_ratio() < 0.5 { - next_is_hashing = true; - } + if let InMemData::Hashing(hashing_data) = &in_mem.data + && hashing_data.cardinality_ratio() < 0.5 + { + next_is_hashing = true; } let cur_in_mem = in_mem.renew(next_is_hashing)?; @@ -449,15 +449,14 @@ impl InMemTable { if self.id == 0 // only works on first table && !self.agg_ctx.is_expand_agg && self.agg_ctx.supports_partial_skipping + && let InMemData::Hashing(hashing_data) = &self.data { - if let InMemData::Hashing(hashing_data) = &self.data { - let cardinality_ratio = hashing_data.cardinality_ratio(); - if cardinality_ratio > self.agg_ctx.partial_skipping_ratio { - log::warn!( - "AggTable cardinality ratio = {cardinality_ratio}, will trigger partial skipping", - ); - return true; - } + let cardinality_ratio = hashing_data.cardinality_ratio(); + if cardinality_ratio > self.agg_ctx.partial_skipping_ratio { + log::warn!( + "AggTable cardinality ratio = {cardinality_ratio}, will trigger partial skipping", + ); + return true; } } false diff --git a/native-engine/datafusion-ext-plans/src/agg_exec.rs b/native-engine/datafusion-ext-plans/src/agg_exec.rs index 474451e0e..ed9955d32 100644 --- a/native-engine/datafusion-ext-plans/src/agg_exec.rs +++ b/native-engine/datafusion-ext-plans/src/agg_exec.rs @@ -354,7 +354,7 @@ fn execute_agg_sorted( staging_keys.clear(); staging_acc_table.resize(0); exec_ctx.baseline_metrics().record_output(num_rows); - sender.send((batch)).await; + sender.send(batch).await; }}; } @@ -730,7 +730,7 @@ mod fuzztest { // will trigger spill let key = (rand::random::() % 1_000_000) as i64; let val = (rand::random::() % 1_000_000) as f64; - let test_null = rand::random::() % 1000 == 0; + let test_null = rand::random::().is_multiple_of(1000); key_builder.append_value(key); if !test_null { diff --git a/native-engine/datafusion-ext-plans/src/common/cached_exprs_evaluator.rs b/native-engine/datafusion-ext-plans/src/common/cached_exprs_evaluator.rs index 7678a8d09..3341d6b3a 100644 --- a/native-engine/datafusion-ext-plans/src/common/cached_exprs_evaluator.rs +++ b/native-engine/datafusion-ext-plans/src/common/cached_exprs_evaluator.rs @@ -211,13 +211,13 @@ fn transform_to_cached_exprs(exprs: &[PhysicalExprRef]) -> Result<(Vec= 2 { - // cache first `when` expr - collect_dups(&expr.children()[1], current_count, expr_counts, dups); - } + if let Ok(case_expr) = downcast_any!(expr, CaseExpr) + && case_expr.expr().is_some() + { + let children = case_expr.children(); + if children.len() >= 2 { + // cache first `when` expr + collect_dups(&expr.children()[1], current_count, expr_counts, dups); } } } else { @@ -272,11 +272,12 @@ fn transform_to_cached_exprs(exprs: &[PhysicalExprRef]) -> Result<(Vec>(); children[0] = transform(children[0].clone(), cached_expr_ids, cache)?; - if let Some(case_expr) = expr.as_any().downcast_ref::() { - if children.len() >= 2 && case_expr.expr().is_some() { - // cache first `when` expr - children[1] = transform(children[1].clone(), cached_expr_ids, cache)?; - } + if let Some(case_expr) = expr.as_any().downcast_ref::() + && children.len() >= 2 + && case_expr.expr().is_some() + { + // cache first `when` expr + children[1] = transform(children[1].clone(), cached_expr_ids, cache)?; } expr.clone().with_new_children(children)? } else { diff --git a/native-engine/datafusion-ext-plans/src/common/execution_context.rs b/native-engine/datafusion-ext-plans/src/common/execution_context.rs index 23ee22f1e..d19910931 100644 --- a/native-engine/datafusion-ext-plans/src/common/execution_context.rs +++ b/native-engine/datafusion-ext-plans/src/common/execution_context.rs @@ -431,10 +431,10 @@ impl ExecutionContext { match ready!(self.as_mut().input.poll_next_unpin(cx)) { Some(r) => Poll::Ready(Some(r)), None => { - if let Some(on_completion) = self.as_mut().on_completion.take() { - if let Err(e) = on_completion() { - return Poll::Ready(Some(Err(e))); - } + if let Some(on_completion) = self.as_mut().on_completion.take() + && let Err(e) = on_completion() + { + return Poll::Ready(Some(Err(e))); } Poll::Ready(None) } diff --git a/native-engine/datafusion-ext-plans/src/debug_exec.rs b/native-engine/datafusion-ext-plans/src/debug_exec.rs index 5654cc2d0..35c192b2f 100644 --- a/native-engine/datafusion-ext-plans/src/debug_exec.rs +++ b/native-engine/datafusion-ext-plans/src/debug_exec.rs @@ -109,7 +109,7 @@ impl ExecutionPlan for DebugExec { Ok( exec_ctx.output_with_sender("Debug", move |sender| async move { while let Some(batch) = input.next().await.transpose()? { - let table_str = pretty_format_batches(&[batch.clone()])? + let table_str = pretty_format_batches(std::slice::from_ref(&batch))? .to_string() .replace('\n', &format!("\n{debug_id} - ")); log::info!("DebugExec(partition={partition}):\n{table_str}"); diff --git a/native-engine/datafusion-ext-plans/src/generate_exec.rs b/native-engine/datafusion-ext-plans/src/generate_exec.rs index ae975237f..adab1b1a0 100644 --- a/native-engine/datafusion-ext-plans/src/generate_exec.rs +++ b/native-engine/datafusion-ext-plans/src/generate_exec.rs @@ -402,7 +402,7 @@ mod test { "| | | {} |", "+---+-----------------+----------------+", ]; - assert_batches_eq!(input, &[input_batch.clone()]); + assert_batches_eq!(input, std::slice::from_ref(&input_batch)); let input = Arc::new(TestMemoryExec::try_new( &[vec![input_batch.clone()]], diff --git a/native-engine/datafusion-ext-plans/src/orc_exec.rs b/native-engine/datafusion-ext-plans/src/orc_exec.rs index 844b5c7b5..49e982ad9 100644 --- a/native-engine/datafusion-ext-plans/src/orc_exec.rs +++ b/native-engine/datafusion-ext-plans/src/orc_exec.rs @@ -455,13 +455,13 @@ fn collect_and_predicates( } // Handle BinaryExpr with AND operator - if let Some(binary) = expr.as_any().downcast_ref::() { - if matches!(binary.op(), Operator::And) { - // Recursively collect AND sub-conditions from both sides - collect_and_predicates(binary.left(), schema, predicates); - collect_and_predicates(binary.right(), schema, predicates); - return; - } + if let Some(binary) = expr.as_any().downcast_ref::() + && matches!(binary.op(), Operator::And) + { + // Recursively collect AND sub-conditions from both sides + collect_and_predicates(binary.left(), schema, predicates); + collect_and_predicates(binary.right(), schema, predicates); + return; } // Not an AND expression, convert the whole expression @@ -487,13 +487,13 @@ fn collect_or_predicates( } // Handle BinaryExpr with OR operator - if let Some(binary) = expr.as_any().downcast_ref::() { - if matches!(binary.op(), Operator::Or) { - // Recursively collect OR sub-conditions from both sides - collect_or_predicates(binary.left(), schema, predicates); - collect_or_predicates(binary.right(), schema, predicates); - return; - } + if let Some(binary) = expr.as_any().downcast_ref::() + && matches!(binary.op(), Operator::Or) + { + // Recursively collect OR sub-conditions from both sides + collect_or_predicates(binary.left(), schema, predicates); + collect_or_predicates(binary.right(), schema, predicates); + return; } // Not an OR expression, convert the whole expression @@ -667,10 +667,10 @@ fn convert_expr_to_orc_internal( // Convert IN to multiple OR conditions: col = val1 OR col = val2 OR ... let mut predicates = Vec::new(); for list_expr in in_list.list() { - if let Some(lit) = list_expr.as_any().downcast_ref::() { - if let Some(pred_value) = convert_scalar_value(lit.value()) { - predicates.push(Predicate::eq(col_name, pred_value)); - } + if let Some(lit) = list_expr.as_any().downcast_ref::() + && let Some(pred_value) = convert_scalar_value(lit.value()) + { + predicates.push(Predicate::eq(col_name, pred_value)); } } @@ -699,20 +699,20 @@ fn convert_expr_to_orc_internal( return None; } - if let Some(col) = left.as_any().downcast_ref::() { - if let Some(lit) = right.as_any().downcast_ref::() { - let col_name = col.name(); - let value = lit.value(); - return build_comparison_predicate(col_name, op, value); - } + if let Some(col) = left.as_any().downcast_ref::() + && let Some(lit) = right.as_any().downcast_ref::() + { + let col_name = col.name(); + let value = lit.value(); + return build_comparison_predicate(col_name, op, value); } - if let Some(lit) = left.as_any().downcast_ref::() { - if let Some(col) = right.as_any().downcast_ref::() { - let col_name = col.name(); - let value = lit.value(); - return build_comparison_predicate_reversed(col_name, op, value); - } + if let Some(lit) = left.as_any().downcast_ref::() + && let Some(col) = right.as_any().downcast_ref::() + { + let col_name = col.name(); + let value = lit.value(); + return build_comparison_predicate_reversed(col_name, op, value); } } diff --git a/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs b/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs index 72be4e555..096f7d7a8 100644 --- a/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs +++ b/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs @@ -222,7 +222,7 @@ fn execute_parquet_sink( let parquet_sink_context_cloned = parquet_sink_context.clone(); *part_writer.lock() = Some({ // send identity batch, after that we can achieve a new output file - sender.send(($batch.slice(0, 1))).await; + sender.send($batch.slice(0, 1)).await; tokio::task::spawn_blocking(move || { PartWriter::try_new(parquet_sink_context_cloned, $part_values) }) diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 7c4258f61..a0ad442a3 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -16,5 +16,5 @@ # under the License. [toolchain] -channel = "nightly-2025-05-09" +channel = "nightly-2025-12-15" components = ["rust-src", "cargo", "rustfmt", "clippy"]