Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions native-engine/auron-memmgr/src/spill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,10 @@ impl Drop for FileSpill {
self.1
.disk_spill_iotime
.add_duration(Duration::from_nanos(self.1.mem_spill_iotime.value() as u64));
if let Some(file_path) = &self.2 {
if let Err(e) = fs::remove_file(file_path) {
warn!("Was unable to delete spill file: {file_path}. error: {e}");
}
if let Some(file_path) = &self.2
&& let Err(e) = fs::remove_file(file_path)
{
warn!("Was unable to delete spill file: {file_path}. error: {e}");
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion native-engine/datafusion-ext-commons/src/hash/xxhash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ fn xxh64_merge_round(mut hash: u64, acc: u64) -> u64 {

#[inline]
fn xxh_rotl64(value: u64, amt: i32) -> u64 {
(value << (amt % 64)) | (value >> (64 - amt % 64))
value.rotate_left((amt as u32) % 64)
}

#[inline]
Expand Down
2 changes: 1 addition & 1 deletion native-engine/datafusion-ext-commons/src/io/batch_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ mod test {
"| [6, 7] | [6, 7] |",
"+-----------+-----------+"
],
&[batch.clone()]
std::slice::from_ref(&batch)
);

// test read after write
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,8 @@ mod tests {
let test_array2: ArrayRef = Arc::new(StringArray::from(vec![Some("foo"), Some("bar")]));
let schema = Arc::new(Schema::new(vec![Field::new("", DataType::Utf8, false)]));

writer.write_batch(2, &[test_array1.clone()])?;
writer.write_batch(2, &[test_array2.clone()])?;
writer.write_batch(2, std::slice::from_ref(&test_array1))?;
writer.write_batch(2, std::slice::from_ref(&test_array2))?;
writer.finish_current_buf()?;

let mut reader = IpcCompressionReader::new(Cursor::new(buf));
Expand Down
9 changes: 4 additions & 5 deletions native-engine/datafusion-ext-commons/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#![allow(internal_features)]
#![feature(core_intrinsics)]
#![feature(slice_swap_unchecked)]
#![feature(vec_into_raw_parts)]

use auron_jni_bridge::conf::{
BATCH_SIZE, IntConf, SUGGESTED_BATCH_MEM_SIZE, SUGGESTED_BATCH_MEM_SIZE_KWAY_MERGE,
Expand Down Expand Up @@ -141,21 +140,21 @@ macro_rules! assume {
macro_rules! prefetch_read_data {
($e:expr) => {{
// safety: use prefetch
let locality = 3;
const LOCALITY: i32 = 3;
#[allow(unused_unsafe)]
unsafe {
std::intrinsics::prefetch_read_data($e, locality)
std::intrinsics::prefetch_read_data::<_, { LOCALITY }>($e)
}
}};
}
#[macro_export]
macro_rules! prefetch_write_data {
($e:expr) => {{
// safety: use prefetch
let locality = 3;
const LOCALITY: i32 = 3;
#[allow(unused_unsafe)]
unsafe {
std::intrinsics::prefetch_write_data($e, locality)
std::intrinsics::prefetch_write_data::<_, { LOCALITY }>($e)
}
}};
}
Expand Down
12 changes: 6 additions & 6 deletions native-engine/datafusion-ext-commons/src/spark_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ mod tests {
])) as ArrayRef;

// generated with Murmur3Hash(Seq(Literal(1L)), 42).eval() since Spark is tested
let hashes = create_murmur3_hashes(5, &[i.clone()], 42);
let hashes = create_murmur3_hashes(5, std::slice::from_ref(&i), 42);
let expected: Vec<i32> = [
0x99f0149d_u32,
0x9c67b85d,
Expand All @@ -482,7 +482,7 @@ mod tests {

// generated with XxHash64(Seq(Literal(1L)), 42).eval() since Spark is tested
// against this as well
let hashes = create_xxhash64_hashes(5, &[i.clone()], 42);
let hashes = create_xxhash64_hashes(5, std::slice::from_ref(&i), 42);
let expected = vec![
-7001672635703045582,
-5252525462095825812,
Expand All @@ -495,11 +495,11 @@ mod tests {

#[test]
fn test_str() {
let i = Arc::new(StringArray::from(vec!["hello", "bar", "", "😁", "天地"]));
let i = Arc::new(StringArray::from(vec!["hello", "bar", "", "😁", "天地"])) as ArrayRef;

// generated with Murmur3Hash(Seq(Literal("")), 42).eval() since Spark is tested
// against this as well
let hashes = create_murmur3_hashes(5, &[i.clone()], 42);
let hashes = create_murmur3_hashes(5, std::slice::from_ref(&i), 42);
let expected: Vec<i32> = [3286402344_u32, 2486176763, 142593372, 885025535, 2395000894]
.into_iter()
.map(|v| v as i32)
Expand All @@ -508,7 +508,7 @@ mod tests {

// generated with XxHash64(Seq(Literal("")), 42).eval() since Spark is tested
// against this as well
let hashes = create_xxhash64_hashes(5, &[i.clone()], 42);
let hashes = create_xxhash64_hashes(5, std::slice::from_ref(&i), 42);
let expected = vec![
-4367754540140381902,
-1798770879548125814,
Expand Down Expand Up @@ -541,7 +541,7 @@ mod tests {
let array_ref = Arc::new(list_array) as ArrayRef;

// Test Murmur3 hash
let hashes = create_murmur3_hashes(3, &[array_ref.clone()], 42);
let hashes = create_murmur3_hashes(3, std::slice::from_ref(&array_ref), 42);
assert_eq!(hashes, vec![-222940379, -374492525, -331964951]);
Ok(())
}
Expand Down
7 changes: 2 additions & 5 deletions native-engine/datafusion-ext-functions/src/spark_dates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,8 @@ pub fn spark_quarter(args: &[ColumnarValue]) -> Result<ColumnarValue> {
.expect("date_part(Month) must return Int32Array");

// Compute quarter: ((month - 1) / 3) + 1, preserving NULLs
let quarter = Int32Array::from_iter(
month_arr
.iter()
.map(|opt_m| opt_m.map(|m| ((m - 1) / 3 + 1))),
);
let quarter =
Int32Array::from_iter(month_arr.iter().map(|opt_m| opt_m.map(|m| (m - 1) / 3 + 1)));

Ok(ColumnarValue::Array(Arc::new(quarter)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,15 +305,15 @@ impl HiveGetJsonObjectEvaluator {
json_str: &str,
) -> std::result::Result<Option<String>, HiveGetJsonObjectError> {
// first try parsing with sonic-rs and fail-backing to serde-json
if let Ok(root_value) = sonic_rs::from_str::<sonic_rs::Value>(json_str) {
if let Ok(v) = self.evaluate_with_value_sonic(&root_value) {
return Ok(v);
}
if let Ok(root_value) = sonic_rs::from_str::<sonic_rs::Value>(json_str)
&& let Ok(v) = self.evaluate_with_value_sonic(&root_value)
{
return Ok(v);
}
if let Ok(root_value) = serde_json::from_str::<serde_json::Value>(json_str) {
if let Ok(v) = self.evaluate_with_value_serde_json(&root_value) {
return Ok(v);
}
if let Ok(root_value) = serde_json::from_str::<serde_json::Value>(json_str)
&& let Ok(v) = self.evaluate_with_value_serde_json(&root_value)
{
return Ok(v);
}
Err(HiveGetJsonObjectError::InvalidInput)
}
Expand Down
8 changes: 4 additions & 4 deletions native-engine/datafusion-ext-functions/src/spark_strings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,10 @@ pub fn string_concat_ws(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if let Ok(s) = as_string_array(&array).cloned() {
return Ok(Arg::Array(s));
}
if let Ok(l) = as_list_array(&array).cloned() {
if l.value_type() == DataType::Utf8 {
return Ok(Arg::List(l));
}
if let Ok(l) = as_list_array(&array).cloned()
&& l.value_type() == DataType::Utf8
{
return Ok(Arg::List(l));
}
}
ColumnarValue::Scalar(scalar) => {
Expand Down
8 changes: 4 additions & 4 deletions native-engine/datafusion-ext-plans/src/agg/acc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,10 +456,10 @@ impl AccBytesColumn {
fn refresh_heap_mem_used(&mut self) {
self.heap_mem_used = 0;
for item in &self.items {
if let Some(v) = item {
if v.spilled() {
self.heap_mem_used += v.capacity();
}
if let Some(v) = item
&& v.spilled()
{
self.heap_mem_used += v.capacity();
}
}
}
Expand Down
31 changes: 15 additions & 16 deletions native-engine/datafusion-ext-plans/src/agg/agg_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ impl AggTable {
self.update_mem_used(mem_used).await?;

// update udaf memory tracker, spill if update failed
if let Some(udaf_mem_tracker) = self.agg_ctx.get_udaf_mem_tracker() {
if !udaf_mem_tracker.update_used()? {
self.force_spill().await?;
}
if let Some(udaf_mem_tracker) = self.agg_ctx.get_udaf_mem_tracker()
&& !udaf_mem_tracker.update_used()?
{
self.force_spill().await?;
}
Ok(())
}
Expand Down Expand Up @@ -329,10 +329,10 @@ impl MemConsumer for AggTable {

// use pre-merging if cardinality is low
let mut next_is_hashing = false;
if let InMemData::Hashing(hashing_data) = &in_mem.data {
if hashing_data.cardinality_ratio() < 0.5 {
next_is_hashing = true;
}
if let InMemData::Hashing(hashing_data) = &in_mem.data
&& hashing_data.cardinality_ratio() < 0.5
{
next_is_hashing = true;
}
let cur_in_mem = in_mem.renew(next_is_hashing)?;

Expand Down Expand Up @@ -449,15 +449,14 @@ impl InMemTable {
if self.id == 0 // only works on first table
&& !self.agg_ctx.is_expand_agg
&& self.agg_ctx.supports_partial_skipping
&& let InMemData::Hashing(hashing_data) = &self.data
{
if let InMemData::Hashing(hashing_data) = &self.data {
let cardinality_ratio = hashing_data.cardinality_ratio();
if cardinality_ratio > self.agg_ctx.partial_skipping_ratio {
log::warn!(
"AggTable cardinality ratio = {cardinality_ratio}, will trigger partial skipping",
);
return true;
}
let cardinality_ratio = hashing_data.cardinality_ratio();
if cardinality_ratio > self.agg_ctx.partial_skipping_ratio {
log::warn!(
"AggTable cardinality ratio = {cardinality_ratio}, will trigger partial skipping",
);
return true;
}
}
false
Expand Down
4 changes: 2 additions & 2 deletions native-engine/datafusion-ext-plans/src/agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ fn execute_agg_sorted(
staging_keys.clear();
staging_acc_table.resize(0);
exec_ctx.baseline_metrics().record_output(num_rows);
sender.send((batch)).await;
sender.send(batch).await;
}};
}

Expand Down Expand Up @@ -730,7 +730,7 @@ mod fuzztest {
// will trigger spill
let key = (rand::random::<u32>() % 1_000_000) as i64;
let val = (rand::random::<u32>() % 1_000_000) as f64;
let test_null = rand::random::<u32>() % 1000 == 0;
let test_null = rand::random::<u32>().is_multiple_of(1000);

key_builder.append_value(key);
if !test_null {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,13 @@ fn transform_to_cached_exprs(exprs: &[PhysicalExprRef]) -> Result<(Vec<PhysicalE
// short circuiting expression - only first child can be cached
// first `when` expr can also be cached
collect_dups(&expr.children()[0], current_count, expr_counts, dups);
if let Ok(case_expr) = downcast_any!(expr, CaseExpr) {
if case_expr.expr().is_some() {
let children = case_expr.children();
if children.len() >= 2 {
// cache first `when` expr
collect_dups(&expr.children()[1], current_count, expr_counts, dups);
}
if let Ok(case_expr) = downcast_any!(expr, CaseExpr)
&& case_expr.expr().is_some()
{
let children = case_expr.children();
if children.len() >= 2 {
// cache first `when` expr
collect_dups(&expr.children()[1], current_count, expr_counts, dups);
}
}
} else {
Expand Down Expand Up @@ -272,11 +272,12 @@ fn transform_to_cached_exprs(exprs: &[PhysicalExprRef]) -> Result<(Vec<PhysicalE
.collect::<Vec<_>>();
children[0] = transform(children[0].clone(), cached_expr_ids, cache)?;

if let Some(case_expr) = expr.as_any().downcast_ref::<CaseExpr>() {
if children.len() >= 2 && case_expr.expr().is_some() {
// cache first `when` expr
children[1] = transform(children[1].clone(), cached_expr_ids, cache)?;
}
if let Some(case_expr) = expr.as_any().downcast_ref::<CaseExpr>()
&& children.len() >= 2
&& case_expr.expr().is_some()
{
// cache first `when` expr
children[1] = transform(children[1].clone(), cached_expr_ids, cache)?;
}
expr.clone().with_new_children(children)?
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,10 @@ impl ExecutionContext {
match ready!(self.as_mut().input.poll_next_unpin(cx)) {
Some(r) => Poll::Ready(Some(r)),
None => {
if let Some(on_completion) = self.as_mut().on_completion.take() {
if let Err(e) = on_completion() {
return Poll::Ready(Some(Err(e)));
}
if let Some(on_completion) = self.as_mut().on_completion.take()
&& let Err(e) = on_completion()
{
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(None)
}
Expand Down
2 changes: 1 addition & 1 deletion native-engine/datafusion-ext-plans/src/debug_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl ExecutionPlan for DebugExec {
Ok(
exec_ctx.output_with_sender("Debug", move |sender| async move {
while let Some(batch) = input.next().await.transpose()? {
let table_str = pretty_format_batches(&[batch.clone()])?
let table_str = pretty_format_batches(std::slice::from_ref(&batch))?
.to_string()
.replace('\n', &format!("\n{debug_id} - "));
log::info!("DebugExec(partition={partition}):\n{table_str}");
Expand Down
2 changes: 1 addition & 1 deletion native-engine/datafusion-ext-plans/src/generate_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ mod test {
"| | | {} |",
"+---+-----------------+----------------+",
];
assert_batches_eq!(input, &[input_batch.clone()]);
assert_batches_eq!(input, std::slice::from_ref(&input_batch));

let input = Arc::new(TestMemoryExec::try_new(
&[vec![input_batch.clone()]],
Expand Down
Loading
Loading