From c7f63e8ab1a3969b9cc7f4398a174c4acac0b74a Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Sat, 31 Jan 2026 15:55:50 +0800 Subject: [PATCH 01/27] feat: add Spark-compatible string_to_map function Adds string_to_map (alias: str_to_map) function that creates a map from a string by splitting on delimiters. - Supports 1-3 args: text, pair_delim (default ','), key_value_delim (default ':') - Returns Map - NULL input returns NULL - Empty string returns {"": NULL} (Spark behavior) - Missing key_value_delim results in NULL value - Duplicate keys: last wins (LAST_WIN policy) Test cases derived from Spark v4.0.0 ComplexTypeSuite.scala. --- datafusion/spark/src/function/map/mod.rs | 2 +- datafusion/spark/src/function/string/mod.rs | 8 + .../src/function/string/string_to_map.rs | 210 ++++++++++++++++++ .../test_files/spark/string/string_to_map.slt | 85 +++++++ 4 files changed, 304 insertions(+), 1 deletion(-) create mode 100644 datafusion/spark/src/function/string/string_to_map.rs create mode 100644 datafusion/sqllogictest/test_files/spark/string/string_to_map.slt diff --git a/datafusion/spark/src/function/map/mod.rs b/datafusion/spark/src/function/map/mod.rs index 2f596b19b422f..93d178a63daf6 100644 --- a/datafusion/spark/src/function/map/mod.rs +++ b/datafusion/spark/src/function/map/mod.rs @@ -17,7 +17,7 @@ pub mod map_from_arrays; pub mod map_from_entries; -mod utils; +pub mod utils; use datafusion_expr::ScalarUDF; use datafusion_functions::make_udf_function; diff --git a/datafusion/spark/src/function/string/mod.rs b/datafusion/spark/src/function/string/mod.rs index 8859beca77996..df66ddc725cd2 100644 --- a/datafusion/spark/src/function/string/mod.rs +++ b/datafusion/spark/src/function/string/mod.rs @@ -26,6 +26,7 @@ pub mod length; pub mod like; pub mod luhn_check; pub mod space; +pub mod string_to_map; pub mod substring; use datafusion_expr::ScalarUDF; @@ -45,6 +46,7 @@ make_udf_function!(format_string::FormatStringFunc, format_string); make_udf_function!(space::SparkSpace, space); make_udf_function!(substring::SparkSubstring, substring); make_udf_function!(base64::SparkUnBase64, unbase64); +make_udf_function!(string_to_map::SparkStringToMap, string_to_map); pub mod expr_fn { use datafusion_functions::export_functions; @@ -110,6 +112,11 @@ pub mod expr_fn { "Decodes the input string `str` from a base64 string into binary data.", str )); + export_functions!(( + string_to_map, + "Creates a map after splitting the text into key/value pairs using delimiters. Default delimiters are ',' for pair_delim and ':' for key_value_delim. Both pair_delim and key_value_delim are treated as regular expressions.", + text pair_delim key_value_delim + )); } pub fn functions() -> Vec> { @@ -127,5 +134,6 @@ pub fn functions() -> Vec> { space(), substring(), unbase64(), + string_to_map(), ] } diff --git a/datafusion/spark/src/function/string/string_to_map.rs b/datafusion/spark/src/function/string/string_to_map.rs new file mode 100644 index 0000000000000..03ab1bb58bdb2 --- /dev/null +++ b/datafusion/spark/src/function/string/string_to_map.rs @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, StringBuilder, StringArray}; +use arrow::datatypes::{DataType, Field, FieldRef}; +use datafusion_common::{Result, internal_err}; +use datafusion_expr::{ + ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, + TypeSignature, Volatility, +}; +use datafusion_functions::utils::make_scalar_function; + +use crate::function::map::utils::{ + map_from_keys_values_offsets_nulls, map_type_from_key_value_types, +}; + +/// Spark-compatible `string_to_map` expression +/// +/// +/// Creates a map from a string by splitting on delimiters. +/// string_to_map(text, pairDelim, keyValueDelim) -> Map +/// +/// - text: The input string +/// - pairDelim: Delimiter between key-value pairs (default: ',') +/// - keyValueDelim: Delimiter between key and value (default: ':') +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkStringToMap { + signature: Signature, + aliases: Vec, +} + +impl Default for SparkStringToMap { + fn default() -> Self { + Self::new() + } +} + +impl SparkStringToMap { + pub fn new() -> Self { + Self { + signature: Signature::one_of( + vec![ + // string_to_map(text) + TypeSignature::String(1), + // string_to_map(text, pairDelim) + TypeSignature::String(2), + // string_to_map(text, pairDelim, keyValueDelim) + TypeSignature::String(3), + ], + Volatility::Immutable, + ), + aliases: vec![String::from("str_to_map")], + } + } +} + +impl ScalarUDFImpl for SparkStringToMap { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "string_to_map" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn aliases(&self) -> &[String] { + &self.aliases + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("return_field_from_args should be used instead") + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + let nullable = args.arg_fields.iter().any(|f| f.is_nullable()); + let map_type = map_type_from_key_value_types(&DataType::Utf8, &DataType::Utf8); + Ok(Arc::new(Field::new(self.name(), map_type, nullable))) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + make_scalar_function(string_to_map_inner, vec![])(&args.args) + } +} + +fn string_to_map_inner(args: &[ArrayRef]) -> Result { + let text_array = &args[0]; + + // Get delimiters with defaults + let pair_delim = if args.len() > 1 { + get_scalar_string(&args[1])? + } else { + ",".to_string() + }; + + let kv_delim = if args.len() > 2 { + get_scalar_string(&args[2])? + } else { + ":".to_string() + }; + + // Process each row + let text_array = text_array + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion_common::DataFusionError::Internal( + "Expected StringArray for text argument".to_string(), + ) + })?; + + let num_rows = text_array.len(); + let mut keys_builder = StringBuilder::new(); + let mut values_builder = StringBuilder::new(); + let mut offsets: Vec = vec![0]; + let mut null_buffer = vec![true; num_rows]; + + for row_idx in 0..num_rows { + if text_array.is_null(row_idx) { + null_buffer[row_idx] = false; + offsets.push(*offsets.last().unwrap()); + continue; + } + + let text = text_array.value(row_idx); + if text.is_empty() { + // Empty string -> map with empty key and NULL value (Spark behavior) + keys_builder.append_value(""); + values_builder.append_null(); + offsets.push(offsets.last().unwrap() + 1); + continue; + } + + let pairs: Vec<&str> = text.split(&pair_delim).collect(); + let mut count = 0; + + for pair in pairs { + if pair.is_empty() { + continue; + } + + let kv: Vec<&str> = pair.splitn(2, &kv_delim).collect(); + let key = kv[0]; + let value = if kv.len() > 1 { Some(kv[1]) } else { None }; + + keys_builder.append_value(key); + if let Some(v) = value { + values_builder.append_value(v); + } else { + values_builder.append_null(); + } + count += 1; + } + + offsets.push(offsets.last().unwrap() + count); + } + + let keys_array: ArrayRef = Arc::new(keys_builder.finish()); + let values_array: ArrayRef = Arc::new(values_builder.finish()); + + // Create null buffer + let null_buffer = arrow::buffer::NullBuffer::from(null_buffer); + + map_from_keys_values_offsets_nulls( + &keys_array, + &values_array, + &offsets, + &offsets, + Some(&null_buffer), + Some(&null_buffer), + ) +} + +/// Extract scalar string value from array (assumes all values are the same) +fn get_scalar_string(array: &ArrayRef) -> Result { + let string_array = array + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion_common::DataFusionError::Internal( + "Expected StringArray for delimiter".to_string(), + ) + })?; + + if string_array.len() == 0 { + return Ok(",".to_string()); + } + + Ok(string_array.value(0).to_string()) +} diff --git a/datafusion/sqllogictest/test_files/spark/string/string_to_map.slt b/datafusion/sqllogictest/test_files/spark/string/string_to_map.slt new file mode 100644 index 0000000000000..3c6770b99da62 --- /dev/null +++ b/datafusion/sqllogictest/test_files/spark/string/string_to_map.slt @@ -0,0 +1,85 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Tests for Spark-compatible string_to_map function +# https://spark.apache.org/docs/latest/api/sql/index.html#str_to_map +# +# Test cases derived from Spark test("StringToMap"): +# https://github.com/apache/spark/blob/v4.0.0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala#L525-L618 +# +# Note: Duplicate key handling uses LAST_WIN policy (not EXCEPTION which is Spark default) + +# Basic test with explicit delimiters +query ? +SELECT string_to_map('a:1,b:2,c:3', ',', ':'); +---- +{a: 1, b: 2, c: 3} + +# Default delimiters (1-arg form) +query ? +SELECT string_to_map('a:1,b:2,c:3'); +---- +{a: 1, b: 2, c: 3} + +# Alias str_to_map +query ? +SELECT str_to_map('a:1,b:2'); +---- +{a: 1, b: 2} + +# Custom delimiters with '=' +query ? +SELECT string_to_map('a=1,b=2,c=3', ',', '='); +---- +{a: 1, b: 2, c: 3} + +# NULL input returns NULL +query ? +SELECT string_to_map(NULL, ',', ':'); +---- +NULL + +# Empty string returns map with empty key and NULL value +query ? +SELECT string_to_map('', ',', ':'); +---- +{: NULL} + +# Single key without value returns NULL value +query ? +SELECT string_to_map('a', ',', ':'); +---- +{a: NULL} + +# Missing key-value delimiter results in NULL value +query ? +SELECT string_to_map('a,b:2', ',', ':'); +---- +{a: NULL, b: 2} + +# Duplicate keys: last wins (LAST_WIN policy) +# Note: map order may differ from Spark, but values are correct +query ? +SELECT string_to_map('a:1,b:2,a:3', ',', ':'); +---- +{b: 2, a: 3} + +# Preserve spaces in values +query ? +SELECT string_to_map('a: ,b:2', ',', ':'); +---- +{a: , b: 2} From 8e85fea4a8eb454be7e3478c95bbc931befe0e64 Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Mon, 2 Feb 2026 20:12:04 +0800 Subject: [PATCH 02/27] refactor: move string_to_map from string to map module The function returns Map type so it belongs in the map module. Co-Authored-By: Claude Opus 4.5 --- datafusion/spark/src/function/map/mod.rs | 10 +++++++++- .../src/function/{string => map}/string_to_map.rs | 0 datafusion/spark/src/function/string/mod.rs | 8 -------- 3 files changed, 9 insertions(+), 9 deletions(-) rename datafusion/spark/src/function/{string => map}/string_to_map.rs (100%) diff --git a/datafusion/spark/src/function/map/mod.rs b/datafusion/spark/src/function/map/mod.rs index 93d178a63daf6..9652a1d8c664a 100644 --- a/datafusion/spark/src/function/map/mod.rs +++ b/datafusion/spark/src/function/map/mod.rs @@ -17,6 +17,7 @@ pub mod map_from_arrays; pub mod map_from_entries; +pub mod string_to_map; pub mod utils; use datafusion_expr::ScalarUDF; @@ -25,6 +26,7 @@ use std::sync::Arc; make_udf_function!(map_from_arrays::MapFromArrays, map_from_arrays); make_udf_function!(map_from_entries::MapFromEntries, map_from_entries); +make_udf_function!(string_to_map::SparkStringToMap, string_to_map); pub mod expr_fn { use datafusion_functions::export_functions; @@ -40,8 +42,14 @@ pub mod expr_fn { "Creates a map from array>.", arg1 )); + + export_functions!(( + string_to_map, + "Creates a map after splitting the text into key/value pairs using delimiters.", + text pair_delim key_value_delim + )); } pub fn functions() -> Vec> { - vec![map_from_arrays(), map_from_entries()] + vec![map_from_arrays(), map_from_entries(), string_to_map()] } diff --git a/datafusion/spark/src/function/string/string_to_map.rs b/datafusion/spark/src/function/map/string_to_map.rs similarity index 100% rename from datafusion/spark/src/function/string/string_to_map.rs rename to datafusion/spark/src/function/map/string_to_map.rs diff --git a/datafusion/spark/src/function/string/mod.rs b/datafusion/spark/src/function/string/mod.rs index df66ddc725cd2..8859beca77996 100644 --- a/datafusion/spark/src/function/string/mod.rs +++ b/datafusion/spark/src/function/string/mod.rs @@ -26,7 +26,6 @@ pub mod length; pub mod like; pub mod luhn_check; pub mod space; -pub mod string_to_map; pub mod substring; use datafusion_expr::ScalarUDF; @@ -46,7 +45,6 @@ make_udf_function!(format_string::FormatStringFunc, format_string); make_udf_function!(space::SparkSpace, space); make_udf_function!(substring::SparkSubstring, substring); make_udf_function!(base64::SparkUnBase64, unbase64); -make_udf_function!(string_to_map::SparkStringToMap, string_to_map); pub mod expr_fn { use datafusion_functions::export_functions; @@ -112,11 +110,6 @@ pub mod expr_fn { "Decodes the input string `str` from a base64 string into binary data.", str )); - export_functions!(( - string_to_map, - "Creates a map after splitting the text into key/value pairs using delimiters. Default delimiters are ',' for pair_delim and ':' for key_value_delim. Both pair_delim and key_value_delim are treated as regular expressions.", - text pair_delim key_value_delim - )); } pub fn functions() -> Vec> { @@ -134,6 +127,5 @@ pub fn functions() -> Vec> { space(), substring(), unbase64(), - string_to_map(), ] } From b4452fea749f5d33f923cf40c866aea8632c419e Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Mon, 2 Feb 2026 20:31:39 +0800 Subject: [PATCH 03/27] refactor: move string_to_map test from spark/string to spark/map Follows the source code move in the previous commit. Co-Authored-By: Claude Opus 4.5 --- .../test_files/spark/{string => map}/string_to_map.slt | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename datafusion/sqllogictest/test_files/spark/{string => map}/string_to_map.slt (100%) diff --git a/datafusion/sqllogictest/test_files/spark/string/string_to_map.slt b/datafusion/sqllogictest/test_files/spark/map/string_to_map.slt similarity index 100% rename from datafusion/sqllogictest/test_files/spark/string/string_to_map.slt rename to datafusion/sqllogictest/test_files/spark/map/string_to_map.slt From 5389da6a25b551e0a0d48efa355cc9d3dddb8181 Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Tue, 3 Feb 2026 09:52:44 +0800 Subject: [PATCH 04/27] docs: add inline comments explaining string_to_map parsing logic Co-Authored-By: Claude Opus 4.5 --- .../spark/src/function/map/string_to_map.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/datafusion/spark/src/function/map/string_to_map.rs b/datafusion/spark/src/function/map/string_to_map.rs index 03ab1bb58bdb2..9cb1335db6f45 100644 --- a/datafusion/spark/src/function/map/string_to_map.rs +++ b/datafusion/spark/src/function/map/string_to_map.rs @@ -151,14 +151,29 @@ fn string_to_map_inner(args: &[ArrayRef]) -> Result { continue; } + // Split text into key-value pairs using pair_delim. + // Example: "a:1,b:2" with pair_delim="," -> ["a:1", "b:2"] let pairs: Vec<&str> = text.split(&pair_delim).collect(); let mut count = 0; for pair in pairs { + // Skip empty pairs (e.g., from "a:1,,b:2" -> ["a:1", "", "b:2"]) if pair.is_empty() { continue; } + // Split each pair into key and value using kv_delim. + // splitn(2, ...) ensures we only split on the FIRST delimiter. + // Example: "a:1:2" with kv_delim=":" -> ["a", "1:2"] (value keeps extra colons) + // + // kv[0] = key (always present) + // kv[1] = value (may not exist if no delimiter found) + // + // Examples: + // "a:1" -> kv = ["a", "1"] -> key="a", value=Some("1") + // "a" -> kv = ["a"] -> key="a", value=None + // "a:" -> kv = ["a", ""] -> key="a", value=Some("") + // ":1" -> kv = ["", "1"] -> key="", value=Some("1") let kv: Vec<&str> = pair.splitn(2, &kv_delim).collect(); let key = kv[0]; let value = if kv.len() > 1 { Some(kv[1]) } else { None }; From bf51d68e4211abb72c472342e16d05d25276dea5 Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Tue, 3 Feb 2026 10:06:10 +0800 Subject: [PATCH 05/27] fix: keep utils module private in map module Co-Authored-By: Claude Opus 4.5 --- datafusion/spark/src/function/map/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/spark/src/function/map/mod.rs b/datafusion/spark/src/function/map/mod.rs index 9652a1d8c664a..9c8d01303601b 100644 --- a/datafusion/spark/src/function/map/mod.rs +++ b/datafusion/spark/src/function/map/mod.rs @@ -18,7 +18,7 @@ pub mod map_from_arrays; pub mod map_from_entries; pub mod string_to_map; -pub mod utils; +mod utils; use datafusion_expr::ScalarUDF; use datafusion_functions::make_udf_function; From 50b0c96c07619ae7c918d7e4bbfbab4c362a320b Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Tue, 3 Feb 2026 10:09:20 +0800 Subject: [PATCH 06/27] fix lint --- datafusion/spark/src/function/map/string_to_map.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/spark/src/function/map/string_to_map.rs b/datafusion/spark/src/function/map/string_to_map.rs index 9cb1335db6f45..18dc6578b6c24 100644 --- a/datafusion/spark/src/function/map/string_to_map.rs +++ b/datafusion/spark/src/function/map/string_to_map.rs @@ -18,7 +18,7 @@ use std::any::Any; use std::sync::Arc; -use arrow::array::{Array, ArrayRef, StringBuilder, StringArray}; +use arrow::array::{Array, ArrayRef, StringArray, StringBuilder}; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::{Result, internal_err}; use datafusion_expr::{ From 09fe99e6c044126b404bf5c417bef38a72678d94 Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Tue, 3 Feb 2026 10:44:44 +0800 Subject: [PATCH 07/27] refine test cases --- .../test_files/spark/map/string_to_map.slt | 73 ++++++++++++------- 1 file changed, 46 insertions(+), 27 deletions(-) diff --git a/datafusion/sqllogictest/test_files/spark/map/string_to_map.slt b/datafusion/sqllogictest/test_files/spark/map/string_to_map.slt index 3c6770b99da62..db0eeeb68bae8 100644 --- a/datafusion/sqllogictest/test_files/spark/map/string_to_map.slt +++ b/datafusion/sqllogictest/test_files/spark/map/string_to_map.slt @@ -23,63 +23,82 @@ # # Note: Duplicate key handling uses LAST_WIN policy (not EXCEPTION which is Spark default) -# Basic test with explicit delimiters -query ? -SELECT string_to_map('a:1,b:2,c:3', ',', ':'); ----- -{a: 1, b: 2, c: 3} - -# Default delimiters (1-arg form) +# s0: Basic test with default delimiters query ? SELECT string_to_map('a:1,b:2,c:3'); ---- {a: 1, b: 2, c: 3} -# Alias str_to_map +# s1: Preserve spaces in values query ? -SELECT str_to_map('a:1,b:2'); +SELECT string_to_map('a: ,b:2'); ---- -{a: 1, b: 2} +{a: , b: 2} -# Custom delimiters with '=' +# s2: Custom key-value delimiter '=' query ? SELECT string_to_map('a=1,b=2,c=3', ',', '='); ---- {a: 1, b: 2, c: 3} -# NULL input returns NULL +# s3: Empty string returns map with empty key and NULL value query ? -SELECT string_to_map(NULL, ',', ':'); +SELECT string_to_map('', ',', '='); ---- -NULL +{: NULL} -# Empty string returns map with empty key and NULL value +# s4: Custom pair delimiter '_' query ? -SELECT string_to_map('', ',', ':'); +SELECT string_to_map('a:1_b:2_c:3', '_', ':'); ---- -{: NULL} +{a: 1, b: 2, c: 3} -# Single key without value returns NULL value +# s5: Single key without value returns NULL value query ? -SELECT string_to_map('a', ',', ':'); +SELECT string_to_map('a'); ---- {a: NULL} -# Missing key-value delimiter results in NULL value +# s6: Custom delimiters '&' and '=' query ? -SELECT string_to_map('a,b:2', ',', ':'); +SELECT string_to_map('a=1&b=2&c=3', '&', '='); ---- -{a: NULL, b: 2} +{a: 1, b: 2, c: 3} # Duplicate keys: last wins (LAST_WIN policy) -# Note: map order may differ from Spark, but values are correct query ? -SELECT string_to_map('a:1,b:2,a:3', ',', ':'); +SELECT string_to_map('a:1,b:2,a:3'); ---- {b: 2, a: 3} -# Preserve spaces in values +# Additional tests (DataFusion-specific) + +# Alias str_to_map query ? -SELECT string_to_map('a: ,b:2', ',', ':'); +SELECT str_to_map('a:1,b:2'); ---- -{a: , b: 2} +{a: 1, b: 2} + +# NULL input returns NULL +query ? +SELECT string_to_map(NULL, ',', ':'); +---- +NULL + +# Explicit 3-arg form +query ? +SELECT string_to_map('a:1,b:2,c:3', ',', ':'); +---- +{a: 1, b: 2, c: 3} + +# Missing key-value delimiter results in NULL value +query ? +SELECT string_to_map('a,b:2', ',', ':'); +---- +{a: NULL, b: 2} + +query ? +SELECT string_to_map('test'); +---- +WRONG_EXPECTED + From 8c4b2008f9c0057142d195a188a8e8e37887cbdb Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Tue, 3 Feb 2026 18:00:03 +0800 Subject: [PATCH 08/27] refactor: avoid heap allocation in string_to_map pairs iteration --- .../spark/src/function/map/string_to_map.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/datafusion/spark/src/function/map/string_to_map.rs b/datafusion/spark/src/function/map/string_to_map.rs index 18dc6578b6c24..1f778c983e16a 100644 --- a/datafusion/spark/src/function/map/string_to_map.rs +++ b/datafusion/spark/src/function/map/string_to_map.rs @@ -35,11 +35,18 @@ use crate::function::map::utils::{ /// /// /// Creates a map from a string by splitting on delimiters. -/// string_to_map(text, pairDelim, keyValueDelim) -> Map +/// str_to_map(text[, pairDelim[, keyValueDelim]]) -> Map /// /// - text: The input string /// - pairDelim: Delimiter between key-value pairs (default: ',') /// - keyValueDelim: Delimiter between key and value (default: ':') +/// +/// # Duplicate Key Handling +/// Currently uses LAST_WIN behavior (last value wins for duplicate keys). +/// +/// TODO: Support `spark.sql.mapKeyDedupPolicy` config (EXCEPTION vs LAST_WIN). +/// Spark 3.0+ defaults to EXCEPTION. See: +/// #[derive(Debug, PartialEq, Eq, Hash)] pub struct SparkStringToMap { signature: Signature, @@ -151,12 +158,11 @@ fn string_to_map_inner(args: &[ArrayRef]) -> Result { continue; } + + let mut count = 0; // Split text into key-value pairs using pair_delim. // Example: "a:1,b:2" with pair_delim="," -> ["a:1", "b:2"] - let pairs: Vec<&str> = text.split(&pair_delim).collect(); - let mut count = 0; - - for pair in pairs { + for pair in text.split(&pair_delim) { // Skip empty pairs (e.g., from "a:1,,b:2" -> ["a:1", "", "b:2"]) if pair.is_empty() { continue; From 2b10df6c92b1da6caa01e85ee432af3d4380b81b Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Tue, 3 Feb 2026 18:10:52 +0800 Subject: [PATCH 09/27] refactor: avoid heap allocation in string_to_map kv splitting --- datafusion/spark/src/function/map/string_to_map.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/spark/src/function/map/string_to_map.rs b/datafusion/spark/src/function/map/string_to_map.rs index 1f778c983e16a..d281244f09a9b 100644 --- a/datafusion/spark/src/function/map/string_to_map.rs +++ b/datafusion/spark/src/function/map/string_to_map.rs @@ -180,9 +180,9 @@ fn string_to_map_inner(args: &[ArrayRef]) -> Result { // "a" -> kv = ["a"] -> key="a", value=None // "a:" -> kv = ["a", ""] -> key="a", value=Some("") // ":1" -> kv = ["", "1"] -> key="", value=Some("1") - let kv: Vec<&str> = pair.splitn(2, &kv_delim).collect(); - let key = kv[0]; - let value = if kv.len() > 1 { Some(kv[1]) } else { None }; + let mut kv_iter = pair.splitn(2, &kv_delim); + let key = kv_iter.next().unwrap_or(""); + let value = kv_iter.next(); keys_builder.append_value(key); if let Some(v) = value { From d7ab94e48324dbe3da48c03fd0bb5d2c8d942605 Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Tue, 3 Feb 2026 19:17:51 +0800 Subject: [PATCH 10/27] refactor: rename get_scalar_string to extract_delimiter_from_string_array --- datafusion/spark/src/function/map/string_to_map.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/spark/src/function/map/string_to_map.rs b/datafusion/spark/src/function/map/string_to_map.rs index d281244f09a9b..3bdfd505f7800 100644 --- a/datafusion/spark/src/function/map/string_to_map.rs +++ b/datafusion/spark/src/function/map/string_to_map.rs @@ -115,13 +115,13 @@ fn string_to_map_inner(args: &[ArrayRef]) -> Result { // Get delimiters with defaults let pair_delim = if args.len() > 1 { - get_scalar_string(&args[1])? + extract_delimiter_from_string_array(&args[1])? } else { ",".to_string() }; let kv_delim = if args.len() > 2 { - get_scalar_string(&args[2])? + extract_delimiter_from_string_array(&args[2])? } else { ":".to_string() }; @@ -212,8 +212,8 @@ fn string_to_map_inner(args: &[ArrayRef]) -> Result { ) } -/// Extract scalar string value from array (assumes all values are the same) -fn get_scalar_string(array: &ArrayRef) -> Result { +/// Extract delimiter value from [`StringArray`]. +fn extract_delimiter_from_string_array(array: &ArrayRef) -> Result { let string_array = array .as_any() .downcast_ref::() From 2b70b6d93bc4174d4d8b89646d62d111f13c6216 Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Tue, 3 Feb 2026 19:51:46 +0800 Subject: [PATCH 11/27] refactor: improve extract_delimiter_from_string_array - Replace len==0 check with assert (delimiter array should never be empty) - Add comment explaining scalar expansion in columnar execution - Add unit test for delimiter extraction (single, multi-char, expanded scalar) Co-Authored-By: Claude Opus 4.5 --- .../spark/src/function/map/string_to_map.rs | 32 ++++++++++++++++--- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/datafusion/spark/src/function/map/string_to_map.rs b/datafusion/spark/src/function/map/string_to_map.rs index 3bdfd505f7800..3c0b733f1e358 100644 --- a/datafusion/spark/src/function/map/string_to_map.rs +++ b/datafusion/spark/src/function/map/string_to_map.rs @@ -158,7 +158,6 @@ fn string_to_map_inner(args: &[ArrayRef]) -> Result { continue; } - let mut count = 0; // Split text into key-value pairs using pair_delim. // Example: "a:1,b:2" with pair_delim="," -> ["a:1", "b:2"] @@ -223,9 +222,34 @@ fn extract_delimiter_from_string_array(array: &ArrayRef) -> Result { ) })?; - if string_array.len() == 0 { - return Ok(",".to_string()); - } + assert!(!string_array.is_empty(), "Delimiter array should not be empty"); + // In columnar execution, scalar delimiter is expanded to array to match batch size. + // All elements are the same, so we just take the first element. Ok(string_array.value(0).to_string()) } + + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_extract_delimiter_from_string_array() { + // Normal case - single element + let delim: ArrayRef = Arc::new(StringArray::from(vec!["&"])); + let result = extract_delimiter_from_string_array(&delim).unwrap(); + assert_eq!(result, "&"); + + // Multi-char delimiter + let delim: ArrayRef = Arc::new(StringArray::from(vec!["&&"])); + let result = extract_delimiter_from_string_array(&delim).unwrap(); + assert_eq!(result, "&&"); + + // Expanded scalar case - multiple elements (all same value). + // This happens when the scalar delimiter is expanded to match batch size + let delim: ArrayRef = Arc::new(StringArray::from(vec!["=", "=", "="])); + let result = extract_delimiter_from_string_array(&delim).unwrap(); + assert_eq!(result, "="); + } +} \ No newline at end of file From f6b3852acee6974c5760904ebec435749b955d89 Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Tue, 3 Feb 2026 19:54:36 +0800 Subject: [PATCH 12/27] test: add multi-row sqllogictests for string_to_map - Add multi-row test with default delimiters - Add multi-row test with custom delimiters (comma and equals) Co-Authored-By: Claude Opus 4.5 --- .../test_files/spark/map/string_to_map.slt | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/datafusion/sqllogictest/test_files/spark/map/string_to_map.slt b/datafusion/sqllogictest/test_files/spark/map/string_to_map.slt index db0eeeb68bae8..046cd8239f15d 100644 --- a/datafusion/sqllogictest/test_files/spark/map/string_to_map.slt +++ b/datafusion/sqllogictest/test_files/spark/map/string_to_map.slt @@ -97,8 +97,18 @@ SELECT string_to_map('a,b:2', ',', ':'); ---- {a: NULL, b: 2} +# Multi-row test query ? -SELECT string_to_map('test'); +SELECT string_to_map(col) FROM (VALUES ('a:1,b:2'), ('x:9'), (NULL)) AS t(col); ---- -WRONG_EXPECTED +{a: 1, b: 2} +{x: 9} +NULL +# Multi-row with custom delimiter +query ? +SELECT string_to_map(col, ',', '=') FROM (VALUES ('a=1,b=2'), ('x=9'), (NULL)) AS t(col); +---- +{a: 1, b: 2} +{x: 9} +NULL \ No newline at end of file From dd9d97299434395cfd1ad172c7102c5d7be0587b Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Tue, 3 Feb 2026 20:45:31 +0800 Subject: [PATCH 13/27] refactor: use current_offset and add table-driven unit tests - Replace `offsets.last().unwrap()` with explicit `current_offset` tracking - Add table-driven unit tests covering s0-s6 Spark test cases + null input - Add multi-row test demonstrating Arrow MapArray internal structure - Import NullBuffer at module level for cleaner code Co-Authored-By: Claude Opus 4.5 --- .../spark/src/function/map/string_to_map.rs | 185 +++++++++++++++++- 1 file changed, 181 insertions(+), 4 deletions(-) diff --git a/datafusion/spark/src/function/map/string_to_map.rs b/datafusion/spark/src/function/map/string_to_map.rs index 3c0b733f1e358..a1606a8afa651 100644 --- a/datafusion/spark/src/function/map/string_to_map.rs +++ b/datafusion/spark/src/function/map/string_to_map.rs @@ -19,6 +19,7 @@ use std::any::Any; use std::sync::Arc; use arrow::array::{Array, ArrayRef, StringArray, StringBuilder}; +use arrow::buffer::NullBuffer; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::{Result, internal_err}; use datafusion_expr::{ @@ -140,12 +141,13 @@ fn string_to_map_inner(args: &[ArrayRef]) -> Result { let mut keys_builder = StringBuilder::new(); let mut values_builder = StringBuilder::new(); let mut offsets: Vec = vec![0]; + let mut current_offset: i32 = 0; let mut null_buffer = vec![true; num_rows]; for row_idx in 0..num_rows { if text_array.is_null(row_idx) { null_buffer[row_idx] = false; - offsets.push(*offsets.last().unwrap()); + offsets.push(current_offset); continue; } @@ -154,7 +156,8 @@ fn string_to_map_inner(args: &[ArrayRef]) -> Result { // Empty string -> map with empty key and NULL value (Spark behavior) keys_builder.append_value(""); values_builder.append_null(); - offsets.push(offsets.last().unwrap() + 1); + current_offset += 1; + offsets.push(current_offset); continue; } @@ -192,14 +195,15 @@ fn string_to_map_inner(args: &[ArrayRef]) -> Result { count += 1; } - offsets.push(offsets.last().unwrap() + count); + current_offset += count; + offsets.push(current_offset); } let keys_array: ArrayRef = Arc::new(keys_builder.finish()); let values_array: ArrayRef = Arc::new(values_builder.finish()); // Create null buffer - let null_buffer = arrow::buffer::NullBuffer::from(null_buffer); + let null_buffer = NullBuffer::from(null_buffer); map_from_keys_values_offsets_nulls( &keys_array, @@ -233,6 +237,30 @@ fn extract_delimiter_from_string_array(array: &ArrayRef) -> Result { #[cfg(test)] mod tests { use super::*; + use arrow::array::MapArray; + + /// Helper to extract keys and values from MapArray for assertions + fn get_map_entries(map_array: &MapArray, row: usize) -> Vec<(String, Option)> { + if map_array.is_null(row) { + return vec![]; + } + let start = map_array.value_offsets()[row] as usize; + let end = map_array.value_offsets()[row + 1] as usize; + let keys = map_array.keys().as_any().downcast_ref::().unwrap(); + let values = map_array.values().as_any().downcast_ref::().unwrap(); + + (start..end) + .map(|i| { + let key = keys.value(i).to_string(); + let value = if values.is_null(i) { + None + } else { + Some(values.value(i).to_string()) + }; + (key, value) + }) + .collect() + } #[test] fn test_extract_delimiter_from_string_array() { @@ -252,4 +280,153 @@ mod tests { let result = extract_delimiter_from_string_array(&delim).unwrap(); assert_eq!(result, "="); } + + // Table-driven tests for string_to_map + // Test cases derived from Spark ComplexTypeSuite: + // https://github.com/apache/spark/blob/v4.0.0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala#L525-L618 + #[test] + fn test_string_to_map_cases() { + struct TestCase { + name: &'static str, + input: Option<&'static str>, // None = NULL input + pair_delim: Option<&'static str>, + kv_delim: Option<&'static str>, + expected: Option)>>, // None = NULL output + } + + let cases = vec![ + TestCase { + name: "s0: basic default delimiters", + input: Some("a:1,b:2,c:3"), + pair_delim: None, + kv_delim: None, + expected: Some(vec![("a", Some("1")), ("b", Some("2")), ("c", Some("3"))]), + }, + TestCase { + name: "s1: preserve spaces in values", + input: Some("a: ,b:2"), + pair_delim: None, + kv_delim: None, + expected: Some(vec![("a", Some(" ")), ("b", Some("2"))]), + }, + TestCase { + name: "s2: custom kv delimiter '='", + input: Some("a=1,b=2,c=3"), + pair_delim: Some(","), + kv_delim: Some("="), + expected: Some(vec![("a", Some("1")), ("b", Some("2")), ("c", Some("3"))]), + }, + TestCase { + name: "s3: empty string", + input: Some(""), + pair_delim: Some(","), + kv_delim: Some("="), + expected: Some(vec![("", None)]), + }, + TestCase { + name: "s4: custom pair delimiter '_'", + input: Some("a:1_b:2_c:3"), + pair_delim: Some("_"), + kv_delim: Some(":"), + expected: Some(vec![("a", Some("1")), ("b", Some("2")), ("c", Some("3"))]), + }, + TestCase { + name: "s5: single key no value", + input: Some("a"), + pair_delim: None, + kv_delim: None, + expected: Some(vec![("a", None)]), + }, + TestCase { + name: "s6: custom delimiters '&' and '='", + input: Some("a=1&b=2&c=3"), + pair_delim: Some("&"), + kv_delim: Some("="), + expected: Some(vec![("a", Some("1")), ("b", Some("2")), ("c", Some("3"))]), + }, + TestCase { + name: "null input returns null", + input: None, + pair_delim: None, + kv_delim: None, + expected: None, + }, + ]; + + for case in cases { + let text: ArrayRef = Arc::new(StringArray::from(vec![case.input])); + let args: Vec = match (case.pair_delim, case.kv_delim) { + (Some(p), Some(k)) => vec![ + text, + Arc::new(StringArray::from(vec![p])), + Arc::new(StringArray::from(vec![k])), + ], + _ => vec![text], + }; + + let result = string_to_map_inner(&args).unwrap(); + let map_array = result.as_any().downcast_ref::().unwrap(); + + assert_eq!(map_array.len(), 1, "case: {}", case.name); + + match case.expected { + None => { + // Expected NULL output + assert!(map_array.is_null(0), "case: {} expected NULL", case.name); + } + Some(expected_entries) => { + assert!(!map_array.is_null(0), "case: {} unexpected NULL", case.name); + let entries = get_map_entries(map_array, 0); + let expected: Vec<(String, Option)> = expected_entries + .iter() + .map(|(k, v)| (k.to_string(), v.map(|s| s.to_string()))) + .collect(); + assert_eq!(entries, expected, "case: {}", case.name); + } + } + } + } + + // Multi-row test showing Arrow array structure + // Input: ["a:1,b:2", "x:9", NULL] + // + // Arrow MapArray internal structure: + // keys: ["a", "b", "x"] (flat array of all keys) + // values: ["1", "2", "9"] (flat array of all values) + // offsets: [0, 2, 3, 3] (marks boundaries between rows) + // + // How to read offsets: + // Row 0: keys[0..2] = ["a", "b"], values[0..2] = ["1", "2"] -> {a: 1, b: 2} + // Row 1: keys[2..3] = ["x"], values[2..3] = ["9"] -> {x: 9} + // Row 2: NULL (offset unchanged: 3..3 = empty) + #[test] + fn test_multi_row_array_structure() { + let text: ArrayRef = Arc::new(StringArray::from(vec![ + Some("a:1,b:2"), + Some("x:9"), + None, + ])); + + let result = string_to_map_inner(&[text]).unwrap(); + let map_array = result.as_any().downcast_ref::().unwrap(); + + // 3 rows in output + assert_eq!(map_array.len(), 3); + + // Row 0: {a: 1, b: 2} + assert!(!map_array.is_null(0)); + let entries = get_map_entries(map_array, 0); + assert_eq!(entries, vec![ + ("a".to_string(), Some("1".to_string())), + ("b".to_string(), Some("2".to_string())), + ]); + + // Row 1: {x: 9} + assert!(!map_array.is_null(1)); + let entries = get_map_entries(map_array, 1); + assert_eq!(entries, vec![("x".to_string(), Some("9".to_string()))]); + + // Row 2: NULL + assert!(map_array.is_null(2)); + } } \ No newline at end of file From 42f2245555236228b2382d710ed721425821b2d5 Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Tue, 3 Feb 2026 20:49:01 +0800 Subject: [PATCH 14/27] refactor: merge multi-row test into table-driven tests Co-Authored-By: Claude Opus 4.5 --- .../spark/src/function/map/string_to_map.rs | 129 +++++++----------- 1 file changed, 49 insertions(+), 80 deletions(-) diff --git a/datafusion/spark/src/function/map/string_to_map.rs b/datafusion/spark/src/function/map/string_to_map.rs index a1606a8afa651..68d57c203ebe6 100644 --- a/datafusion/spark/src/function/map/string_to_map.rs +++ b/datafusion/spark/src/function/map/string_to_map.rs @@ -288,78 +288,89 @@ mod tests { fn test_string_to_map_cases() { struct TestCase { name: &'static str, - input: Option<&'static str>, // None = NULL input + inputs: Vec>, pair_delim: Option<&'static str>, kv_delim: Option<&'static str>, - expected: Option)>>, // None = NULL output + expected: Vec)>>>, } let cases = vec![ TestCase { name: "s0: basic default delimiters", - input: Some("a:1,b:2,c:3"), + inputs: vec![Some("a:1,b:2,c:3")], pair_delim: None, kv_delim: None, - expected: Some(vec![("a", Some("1")), ("b", Some("2")), ("c", Some("3"))]), + expected: vec![Some(vec![("a", Some("1")), ("b", Some("2")), ("c", Some("3"))])], }, TestCase { name: "s1: preserve spaces in values", - input: Some("a: ,b:2"), + inputs: vec![Some("a: ,b:2")], pair_delim: None, kv_delim: None, - expected: Some(vec![("a", Some(" ")), ("b", Some("2"))]), + expected: vec![Some(vec![("a", Some(" ")), ("b", Some("2"))])], }, TestCase { name: "s2: custom kv delimiter '='", - input: Some("a=1,b=2,c=3"), + inputs: vec![Some("a=1,b=2,c=3")], pair_delim: Some(","), kv_delim: Some("="), - expected: Some(vec![("a", Some("1")), ("b", Some("2")), ("c", Some("3"))]), + expected: vec![Some(vec![("a", Some("1")), ("b", Some("2")), ("c", Some("3"))])], }, TestCase { name: "s3: empty string", - input: Some(""), + inputs: vec![Some("")], pair_delim: Some(","), kv_delim: Some("="), - expected: Some(vec![("", None)]), + expected: vec![Some(vec![("", None)])], }, TestCase { name: "s4: custom pair delimiter '_'", - input: Some("a:1_b:2_c:3"), + inputs: vec![Some("a:1_b:2_c:3")], pair_delim: Some("_"), kv_delim: Some(":"), - expected: Some(vec![("a", Some("1")), ("b", Some("2")), ("c", Some("3"))]), + expected: vec![Some(vec![("a", Some("1")), ("b", Some("2")), ("c", Some("3"))])], }, TestCase { name: "s5: single key no value", - input: Some("a"), + inputs: vec![Some("a")], pair_delim: None, kv_delim: None, - expected: Some(vec![("a", None)]), + expected: vec![Some(vec![("a", None)])], }, TestCase { name: "s6: custom delimiters '&' and '='", - input: Some("a=1&b=2&c=3"), + inputs: vec![Some("a=1&b=2&c=3")], pair_delim: Some("&"), kv_delim: Some("="), - expected: Some(vec![("a", Some("1")), ("b", Some("2")), ("c", Some("3"))]), + expected: vec![Some(vec![("a", Some("1")), ("b", Some("2")), ("c", Some("3"))])], }, TestCase { name: "null input returns null", - input: None, + inputs: vec![None], pair_delim: None, kv_delim: None, - expected: None, + expected: vec![None], + }, + TestCase { + name: "multi-row", + inputs: vec![Some("a:1,b:2"), Some("x:9"), None], + pair_delim: None, + kv_delim: None, + expected: vec![ + Some(vec![("a", Some("1")), ("b", Some("2"))]), + Some(vec![("x", Some("9"))]), + None, + ], }, ]; for case in cases { - let text: ArrayRef = Arc::new(StringArray::from(vec![case.input])); + let text: ArrayRef = Arc::new(StringArray::from(case.inputs)); let args: Vec = match (case.pair_delim, case.kv_delim) { (Some(p), Some(k)) => vec![ - text, - Arc::new(StringArray::from(vec![p])), - Arc::new(StringArray::from(vec![k])), + text.clone(), + Arc::new(StringArray::from(vec![p; text.len()])), + Arc::new(StringArray::from(vec![k; text.len()])), ], _ => vec![text], }; @@ -367,66 +378,24 @@ mod tests { let result = string_to_map_inner(&args).unwrap(); let map_array = result.as_any().downcast_ref::().unwrap(); - assert_eq!(map_array.len(), 1, "case: {}", case.name); - - match case.expected { - None => { - // Expected NULL output - assert!(map_array.is_null(0), "case: {} expected NULL", case.name); - } - Some(expected_entries) => { - assert!(!map_array.is_null(0), "case: {} unexpected NULL", case.name); - let entries = get_map_entries(map_array, 0); - let expected: Vec<(String, Option)> = expected_entries - .iter() - .map(|(k, v)| (k.to_string(), v.map(|s| s.to_string()))) - .collect(); - assert_eq!(entries, expected, "case: {}", case.name); + assert_eq!(map_array.len(), case.expected.len(), "case: {}", case.name); + + for (row_idx, expected_row) in case.expected.iter().enumerate() { + match expected_row { + None => { + assert!(map_array.is_null(row_idx), "case: {} row {} expected NULL", case.name, row_idx); + } + Some(expected_entries) => { + assert!(!map_array.is_null(row_idx), "case: {} row {} unexpected NULL", case.name, row_idx); + let entries = get_map_entries(map_array, row_idx); + let expected: Vec<(String, Option)> = expected_entries + .iter() + .map(|(k, v)| (k.to_string(), v.map(|s| s.to_string()))) + .collect(); + assert_eq!(entries, expected, "case: {} row {}", case.name, row_idx); + } } } } } - - // Multi-row test showing Arrow array structure - // Input: ["a:1,b:2", "x:9", NULL] - // - // Arrow MapArray internal structure: - // keys: ["a", "b", "x"] (flat array of all keys) - // values: ["1", "2", "9"] (flat array of all values) - // offsets: [0, 2, 3, 3] (marks boundaries between rows) - // - // How to read offsets: - // Row 0: keys[0..2] = ["a", "b"], values[0..2] = ["1", "2"] -> {a: 1, b: 2} - // Row 1: keys[2..3] = ["x"], values[2..3] = ["9"] -> {x: 9} - // Row 2: NULL (offset unchanged: 3..3 = empty) - #[test] - fn test_multi_row_array_structure() { - let text: ArrayRef = Arc::new(StringArray::from(vec![ - Some("a:1,b:2"), - Some("x:9"), - None, - ])); - - let result = string_to_map_inner(&[text]).unwrap(); - let map_array = result.as_any().downcast_ref::().unwrap(); - - // 3 rows in output - assert_eq!(map_array.len(), 3); - - // Row 0: {a: 1, b: 2} - assert!(!map_array.is_null(0)); - let entries = get_map_entries(map_array, 0); - assert_eq!(entries, vec![ - ("a".to_string(), Some("1".to_string())), - ("b".to_string(), Some("2".to_string())), - ]); - - // Row 1: {x: 9} - assert!(!map_array.is_null(1)); - let entries = get_map_entries(map_array, 1); - assert_eq!(entries, vec![("x".to_string(), Some("9".to_string()))]); - - // Row 2: NULL - assert!(map_array.is_null(2)); - } } \ No newline at end of file From bb4dd48abe0672abec1ad85cc2a108923efeefd3 Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Tue, 3 Feb 2026 20:51:32 +0800 Subject: [PATCH 15/27] test: add duplicate key test case with LAST_WIN behavior Documents current behavior and adds TODO for Spark's EXCEPTION default. Co-Authored-By: Claude Opus 4.5 --- datafusion/spark/src/function/map/string_to_map.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/datafusion/spark/src/function/map/string_to_map.rs b/datafusion/spark/src/function/map/string_to_map.rs index 68d57c203ebe6..42eed7182a0e2 100644 --- a/datafusion/spark/src/function/map/string_to_map.rs +++ b/datafusion/spark/src/function/map/string_to_map.rs @@ -362,6 +362,16 @@ mod tests { None, ], }, + // TODO: Spark 3.0+ defaults to EXCEPTION for duplicate keys. + // Current behavior: LAST_WIN (keeps last value, first 'a' dropped). + // See: spark.sql.mapKeyDedupPolicy + TestCase { + name: "duplicate keys (LAST_WIN)", + inputs: vec![Some("a:1,b:2,a:3")], + pair_delim: None, + kv_delim: None, + expected: vec![Some(vec![("b", Some("2")), ("a", Some("3"))])], + }, ]; for case in cases { From b4d8d003527b4552ac03182dd11b317b95e9486c Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Tue, 3 Feb 2026 21:21:55 +0800 Subject: [PATCH 16/27] refactor: add assert for args length in string_to_map Co-Authored-By: Claude Opus 4.5 --- datafusion/spark/src/function/map/string_to_map.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/datafusion/spark/src/function/map/string_to_map.rs b/datafusion/spark/src/function/map/string_to_map.rs index 42eed7182a0e2..c46b4485f1d00 100644 --- a/datafusion/spark/src/function/map/string_to_map.rs +++ b/datafusion/spark/src/function/map/string_to_map.rs @@ -112,6 +112,12 @@ impl ScalarUDFImpl for SparkStringToMap { } fn string_to_map_inner(args: &[ArrayRef]) -> Result { + assert!( + !args.is_empty() && args.len() <= 3, + "string_to_map expects 1-3 arguments, got {}", + args.len() + ); + let text_array = &args[0]; // Get delimiters with defaults From 7b7c8413905cfec6baf70b363c414587dc3971f2 Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Tue, 3 Feb 2026 21:25:51 +0800 Subject: [PATCH 17/27] refactor: put helper at bottom --- .../spark/src/function/map/string_to_map.rs | 47 ++++++++++--------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/datafusion/spark/src/function/map/string_to_map.rs b/datafusion/spark/src/function/map/string_to_map.rs index c46b4485f1d00..e78dd0dbfe12f 100644 --- a/datafusion/spark/src/function/map/string_to_map.rs +++ b/datafusion/spark/src/function/map/string_to_map.rs @@ -245,29 +245,6 @@ mod tests { use super::*; use arrow::array::MapArray; - /// Helper to extract keys and values from MapArray for assertions - fn get_map_entries(map_array: &MapArray, row: usize) -> Vec<(String, Option)> { - if map_array.is_null(row) { - return vec![]; - } - let start = map_array.value_offsets()[row] as usize; - let end = map_array.value_offsets()[row + 1] as usize; - let keys = map_array.keys().as_any().downcast_ref::().unwrap(); - let values = map_array.values().as_any().downcast_ref::().unwrap(); - - (start..end) - .map(|i| { - let key = keys.value(i).to_string(); - let value = if values.is_null(i) { - None - } else { - Some(values.value(i).to_string()) - }; - (key, value) - }) - .collect() - } - #[test] fn test_extract_delimiter_from_string_array() { // Normal case - single element @@ -414,4 +391,28 @@ mod tests { } } } + + + /// Helper to extract keys and values from MapArray for assertions + fn get_map_entries(map_array: &MapArray, row: usize) -> Vec<(String, Option)> { + if map_array.is_null(row) { + return vec![]; + } + let start = map_array.value_offsets()[row] as usize; + let end = map_array.value_offsets()[row + 1] as usize; + let keys = map_array.keys().as_any().downcast_ref::().unwrap(); + let values = map_array.values().as_any().downcast_ref::().unwrap(); + + (start..end) + .map(|i| { + let key = keys.value(i).to_string(); + let value = if values.is_null(i) { + None + } else { + Some(values.value(i).to_string()) + }; + (key, value) + }) + .collect() + } } \ No newline at end of file From 3f424d21d0c33bc35db6c038748faedb940a4c94 Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Tue, 3 Feb 2026 21:27:07 +0800 Subject: [PATCH 18/27] style: run cargo fmt Co-Authored-By: Claude Opus 4.5 --- .../spark/src/function/map/string_to_map.rs | 70 +++++++++++++++---- 1 file changed, 56 insertions(+), 14 deletions(-) diff --git a/datafusion/spark/src/function/map/string_to_map.rs b/datafusion/spark/src/function/map/string_to_map.rs index e78dd0dbfe12f..812988e0fafa8 100644 --- a/datafusion/spark/src/function/map/string_to_map.rs +++ b/datafusion/spark/src/function/map/string_to_map.rs @@ -232,14 +232,16 @@ fn extract_delimiter_from_string_array(array: &ArrayRef) -> Result { ) })?; - assert!(!string_array.is_empty(), "Delimiter array should not be empty"); + assert!( + !string_array.is_empty(), + "Delimiter array should not be empty" + ); // In columnar execution, scalar delimiter is expanded to array to match batch size. // All elements are the same, so we just take the first element. Ok(string_array.value(0).to_string()) } - #[cfg(test)] mod tests { use super::*; @@ -283,7 +285,11 @@ mod tests { inputs: vec![Some("a:1,b:2,c:3")], pair_delim: None, kv_delim: None, - expected: vec![Some(vec![("a", Some("1")), ("b", Some("2")), ("c", Some("3"))])], + expected: vec![Some(vec![ + ("a", Some("1")), + ("b", Some("2")), + ("c", Some("3")), + ])], }, TestCase { name: "s1: preserve spaces in values", @@ -297,7 +303,11 @@ mod tests { inputs: vec![Some("a=1,b=2,c=3")], pair_delim: Some(","), kv_delim: Some("="), - expected: vec![Some(vec![("a", Some("1")), ("b", Some("2")), ("c", Some("3"))])], + expected: vec![Some(vec![ + ("a", Some("1")), + ("b", Some("2")), + ("c", Some("3")), + ])], }, TestCase { name: "s3: empty string", @@ -311,7 +321,11 @@ mod tests { inputs: vec![Some("a:1_b:2_c:3")], pair_delim: Some("_"), kv_delim: Some(":"), - expected: vec![Some(vec![("a", Some("1")), ("b", Some("2")), ("c", Some("3"))])], + expected: vec![Some(vec![ + ("a", Some("1")), + ("b", Some("2")), + ("c", Some("3")), + ])], }, TestCase { name: "s5: single key no value", @@ -325,7 +339,11 @@ mod tests { inputs: vec![Some("a=1&b=2&c=3")], pair_delim: Some("&"), kv_delim: Some("="), - expected: vec![Some(vec![("a", Some("1")), ("b", Some("2")), ("c", Some("3"))])], + expected: vec![Some(vec![ + ("a", Some("1")), + ("b", Some("2")), + ("c", Some("3")), + ])], }, TestCase { name: "null input returns null", @@ -376,32 +394,56 @@ mod tests { for (row_idx, expected_row) in case.expected.iter().enumerate() { match expected_row { None => { - assert!(map_array.is_null(row_idx), "case: {} row {} expected NULL", case.name, row_idx); + assert!( + map_array.is_null(row_idx), + "case: {} row {} expected NULL", + case.name, + row_idx + ); } Some(expected_entries) => { - assert!(!map_array.is_null(row_idx), "case: {} row {} unexpected NULL", case.name, row_idx); + assert!( + !map_array.is_null(row_idx), + "case: {} row {} unexpected NULL", + case.name, + row_idx + ); let entries = get_map_entries(map_array, row_idx); let expected: Vec<(String, Option)> = expected_entries .iter() .map(|(k, v)| (k.to_string(), v.map(|s| s.to_string()))) .collect(); - assert_eq!(entries, expected, "case: {} row {}", case.name, row_idx); + assert_eq!( + entries, expected, + "case: {} row {}", + case.name, row_idx + ); } } } } } - /// Helper to extract keys and values from MapArray for assertions - fn get_map_entries(map_array: &MapArray, row: usize) -> Vec<(String, Option)> { + fn get_map_entries( + map_array: &MapArray, + row: usize, + ) -> Vec<(String, Option)> { if map_array.is_null(row) { return vec![]; } let start = map_array.value_offsets()[row] as usize; let end = map_array.value_offsets()[row + 1] as usize; - let keys = map_array.keys().as_any().downcast_ref::().unwrap(); - let values = map_array.values().as_any().downcast_ref::().unwrap(); + let keys = map_array + .keys() + .as_any() + .downcast_ref::() + .unwrap(); + let values = map_array + .values() + .as_any() + .downcast_ref::() + .unwrap(); (start..end) .map(|i| { @@ -415,4 +457,4 @@ mod tests { }) .collect() } -} \ No newline at end of file +} From 895858cd30825a17e6c6a8f6d222c656f45e3812 Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Fri, 6 Feb 2026 11:34:57 +0800 Subject: [PATCH 19/27] rename: string_to_map -> str_to_map (file rename only) Pure file rename, no content changes. Prepares for the function name change in the next commit. Co-Authored-By: Claude Opus 4.6 --- .../spark/src/function/map/{string_to_map.rs => str_to_map.rs} | 0 .../test_files/spark/map/{string_to_map.slt => str_to_map.slt} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename datafusion/spark/src/function/map/{string_to_map.rs => str_to_map.rs} (100%) rename datafusion/sqllogictest/test_files/spark/map/{string_to_map.slt => str_to_map.slt} (100%) diff --git a/datafusion/spark/src/function/map/string_to_map.rs b/datafusion/spark/src/function/map/str_to_map.rs similarity index 100% rename from datafusion/spark/src/function/map/string_to_map.rs rename to datafusion/spark/src/function/map/str_to_map.rs diff --git a/datafusion/sqllogictest/test_files/spark/map/string_to_map.slt b/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt similarity index 100% rename from datafusion/sqllogictest/test_files/spark/map/string_to_map.slt rename to datafusion/sqllogictest/test_files/spark/map/str_to_map.slt From 6ce0eea28f948181372f9c901fe80ea2cc6c534a Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Fri, 6 Feb 2026 11:36:44 +0800 Subject: [PATCH 20/27] refactor: rename SparkStringToMap to SparkStrToMap Rename struct, function name, and all references from string_to_map to str_to_map. Remove alias. Co-Authored-By: Claude Opus 4.6 --- datafusion/spark/src/function/map/mod.rs | 8 ++--- .../spark/src/function/map/str_to_map.rs | 36 ++++++++----------- .../test_files/spark/map/str_to_map.slt | 28 +++++++-------- 3 files changed, 33 insertions(+), 39 deletions(-) diff --git a/datafusion/spark/src/function/map/mod.rs b/datafusion/spark/src/function/map/mod.rs index 9c8d01303601b..c9ebed6f612e1 100644 --- a/datafusion/spark/src/function/map/mod.rs +++ b/datafusion/spark/src/function/map/mod.rs @@ -17,7 +17,7 @@ pub mod map_from_arrays; pub mod map_from_entries; -pub mod string_to_map; +pub mod str_to_map; mod utils; use datafusion_expr::ScalarUDF; @@ -26,7 +26,7 @@ use std::sync::Arc; make_udf_function!(map_from_arrays::MapFromArrays, map_from_arrays); make_udf_function!(map_from_entries::MapFromEntries, map_from_entries); -make_udf_function!(string_to_map::SparkStringToMap, string_to_map); +make_udf_function!(str_to_map::SparkStrToMap, str_to_map); pub mod expr_fn { use datafusion_functions::export_functions; @@ -44,12 +44,12 @@ pub mod expr_fn { )); export_functions!(( - string_to_map, + str_to_map, "Creates a map after splitting the text into key/value pairs using delimiters.", text pair_delim key_value_delim )); } pub fn functions() -> Vec> { - vec![map_from_arrays(), map_from_entries(), string_to_map()] + vec![map_from_arrays(), map_from_entries(), str_to_map()] } diff --git a/datafusion/spark/src/function/map/str_to_map.rs b/datafusion/spark/src/function/map/str_to_map.rs index 812988e0fafa8..f351a904186c8 100644 --- a/datafusion/spark/src/function/map/str_to_map.rs +++ b/datafusion/spark/src/function/map/str_to_map.rs @@ -32,7 +32,7 @@ use crate::function::map::utils::{ map_from_keys_values_offsets_nulls, map_type_from_key_value_types, }; -/// Spark-compatible `string_to_map` expression +/// Spark-compatible `str_to_map` expression /// /// /// Creates a map from a string by splitting on delimiters. @@ -49,53 +49,47 @@ use crate::function::map::utils::{ /// Spark 3.0+ defaults to EXCEPTION. See: /// #[derive(Debug, PartialEq, Eq, Hash)] -pub struct SparkStringToMap { +pub struct SparkStrToMap { signature: Signature, - aliases: Vec, } -impl Default for SparkStringToMap { +impl Default for SparkStrToMap { fn default() -> Self { Self::new() } } -impl SparkStringToMap { +impl SparkStrToMap { pub fn new() -> Self { Self { signature: Signature::one_of( vec![ - // string_to_map(text) + // str_to_map(text) TypeSignature::String(1), - // string_to_map(text, pairDelim) + // str_to_map(text, pairDelim) TypeSignature::String(2), - // string_to_map(text, pairDelim, keyValueDelim) + // str_to_map(text, pairDelim, keyValueDelim) TypeSignature::String(3), ], Volatility::Immutable, ), - aliases: vec![String::from("str_to_map")], } } } -impl ScalarUDFImpl for SparkStringToMap { +impl ScalarUDFImpl for SparkStrToMap { fn as_any(&self) -> &dyn Any { self } fn name(&self) -> &str { - "string_to_map" + "str_to_map" } fn signature(&self) -> &Signature { &self.signature } - fn aliases(&self) -> &[String] { - &self.aliases - } - fn return_type(&self, _arg_types: &[DataType]) -> Result { internal_err!("return_field_from_args should be used instead") } @@ -107,14 +101,14 @@ impl ScalarUDFImpl for SparkStringToMap { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - make_scalar_function(string_to_map_inner, vec![])(&args.args) + make_scalar_function(str_to_map_inner, vec![])(&args.args) } } -fn string_to_map_inner(args: &[ArrayRef]) -> Result { +fn str_to_map_inner(args: &[ArrayRef]) -> Result { assert!( !args.is_empty() && args.len() <= 3, - "string_to_map expects 1-3 arguments, got {}", + "str_to_map expects 1-3 arguments, got {}", args.len() ); @@ -266,11 +260,11 @@ mod tests { assert_eq!(result, "="); } - // Table-driven tests for string_to_map + // Table-driven tests for str_to_map // Test cases derived from Spark ComplexTypeSuite: // https://github.com/apache/spark/blob/v4.0.0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala#L525-L618 #[test] - fn test_string_to_map_cases() { + fn test_str_to_map_cases() { struct TestCase { name: &'static str, inputs: Vec>, @@ -386,7 +380,7 @@ mod tests { _ => vec![text], }; - let result = string_to_map_inner(&args).unwrap(); + let result = str_to_map_inner(&args).unwrap(); let map_array = result.as_any().downcast_ref::().unwrap(); assert_eq!(map_array.len(), case.expected.len(), "case: {}", case.name); diff --git a/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt b/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt index 046cd8239f15d..c7eca75d1e4d4 100644 --- a/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt +++ b/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -# Tests for Spark-compatible string_to_map function +# Tests for Spark-compatible str_to_map function # https://spark.apache.org/docs/latest/api/sql/index.html#str_to_map # # Test cases derived from Spark test("StringToMap"): @@ -25,49 +25,49 @@ # s0: Basic test with default delimiters query ? -SELECT string_to_map('a:1,b:2,c:3'); +SELECT str_to_map('a:1,b:2,c:3'); ---- {a: 1, b: 2, c: 3} # s1: Preserve spaces in values query ? -SELECT string_to_map('a: ,b:2'); +SELECT str_to_map('a: ,b:2'); ---- {a: , b: 2} # s2: Custom key-value delimiter '=' query ? -SELECT string_to_map('a=1,b=2,c=3', ',', '='); +SELECT str_to_map('a=1,b=2,c=3', ',', '='); ---- {a: 1, b: 2, c: 3} # s3: Empty string returns map with empty key and NULL value query ? -SELECT string_to_map('', ',', '='); +SELECT str_to_map('', ',', '='); ---- {: NULL} # s4: Custom pair delimiter '_' query ? -SELECT string_to_map('a:1_b:2_c:3', '_', ':'); +SELECT str_to_map('a:1_b:2_c:3', '_', ':'); ---- {a: 1, b: 2, c: 3} # s5: Single key without value returns NULL value query ? -SELECT string_to_map('a'); +SELECT str_to_map('a'); ---- {a: NULL} # s6: Custom delimiters '&' and '=' query ? -SELECT string_to_map('a=1&b=2&c=3', '&', '='); +SELECT str_to_map('a=1&b=2&c=3', '&', '='); ---- {a: 1, b: 2, c: 3} # Duplicate keys: last wins (LAST_WIN policy) query ? -SELECT string_to_map('a:1,b:2,a:3'); +SELECT str_to_map('a:1,b:2,a:3'); ---- {b: 2, a: 3} @@ -81,25 +81,25 @@ SELECT str_to_map('a:1,b:2'); # NULL input returns NULL query ? -SELECT string_to_map(NULL, ',', ':'); +SELECT str_to_map(NULL, ',', ':'); ---- NULL # Explicit 3-arg form query ? -SELECT string_to_map('a:1,b:2,c:3', ',', ':'); +SELECT str_to_map('a:1,b:2,c:3', ',', ':'); ---- {a: 1, b: 2, c: 3} # Missing key-value delimiter results in NULL value query ? -SELECT string_to_map('a,b:2', ',', ':'); +SELECT str_to_map('a,b:2', ',', ':'); ---- {a: NULL, b: 2} # Multi-row test query ? -SELECT string_to_map(col) FROM (VALUES ('a:1,b:2'), ('x:9'), (NULL)) AS t(col); +SELECT str_to_map(col) FROM (VALUES ('a:1,b:2'), ('x:9'), (NULL)) AS t(col); ---- {a: 1, b: 2} {x: 9} @@ -107,7 +107,7 @@ NULL # Multi-row with custom delimiter query ? -SELECT string_to_map(col, ',', '=') FROM (VALUES ('a=1,b=2'), ('x=9'), (NULL)) AS t(col); +SELECT str_to_map(col, ',', '=') FROM (VALUES ('a=1,b=2'), ('x=9'), (NULL)) AS t(col); ---- {a: 1, b: 2} {x: 9} From e53a0cd7ba03b5ee425fdced89825b8d2e0f496d Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Fri, 6 Feb 2026 11:38:25 +0800 Subject: [PATCH 21/27] feat: use MapBuilder, EXCEPTION dedup policy, per-row delimiters - Rewrite str_to_map to use arrow MapBuilder instead of manual offsets + map_from_keys_values_offsets_nulls - Default to EXCEPTION policy for duplicate keys (Spark 3.0+ default) - Support per-row delimiters (extract delimiter per row, not once) - Null delimiter produces null map row Co-Authored-By: Claude Opus 4.6 --- .../spark/src/function/map/str_to_map.rs | 236 ++++++++---------- .../test_files/spark/map/str_to_map.slt | 15 +- 2 files changed, 103 insertions(+), 148 deletions(-) diff --git a/datafusion/spark/src/function/map/str_to_map.rs b/datafusion/spark/src/function/map/str_to_map.rs index f351a904186c8..54feebd344b25 100644 --- a/datafusion/spark/src/function/map/str_to_map.rs +++ b/datafusion/spark/src/function/map/str_to_map.rs @@ -16,21 +16,18 @@ // under the License. use std::any::Any; +use std::collections::HashSet; use std::sync::Arc; -use arrow::array::{Array, ArrayRef, StringArray, StringBuilder}; -use arrow::buffer::NullBuffer; +use arrow::array::{Array, ArrayRef, AsArray, MapBuilder, MapFieldNames, StringBuilder}; use arrow::datatypes::{DataType, Field, FieldRef}; -use datafusion_common::{Result, internal_err}; +use datafusion_common::{Result, exec_err, internal_err}; use datafusion_expr::{ ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, }; -use datafusion_functions::utils::make_scalar_function; -use crate::function::map::utils::{ - map_from_keys_values_offsets_nulls, map_type_from_key_value_types, -}; +use crate::function::map::utils::map_type_from_key_value_types; /// Spark-compatible `str_to_map` expression /// @@ -43,10 +40,8 @@ use crate::function::map::utils::{ /// - keyValueDelim: Delimiter between key and value (default: ':') /// /// # Duplicate Key Handling -/// Currently uses LAST_WIN behavior (last value wins for duplicate keys). -/// -/// TODO: Support `spark.sql.mapKeyDedupPolicy` config (EXCEPTION vs LAST_WIN). -/// Spark 3.0+ defaults to EXCEPTION. See: +/// Uses EXCEPTION behavior (Spark 3.0+ default): errors on duplicate keys. +/// See `spark.sql.mapKeyDedupPolicy`: /// #[derive(Debug, PartialEq, Eq, Hash)] pub struct SparkStrToMap { @@ -101,7 +96,9 @@ impl ScalarUDFImpl for SparkStrToMap { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - make_scalar_function(str_to_map_inner, vec![])(&args.args) + let arrays: Vec = ColumnarValue::values_to_arrays(&args.args)?; + let result = str_to_map_inner(&arrays)?; + Ok(ColumnarValue::Array(result)) } } @@ -112,153 +109,88 @@ fn str_to_map_inner(args: &[ArrayRef]) -> Result { args.len() ); - let text_array = &args[0]; - - // Get delimiters with defaults - let pair_delim = if args.len() > 1 { - extract_delimiter_from_string_array(&args[1])? + let text_array = args[0].as_string::(); + let pair_delim_array = if args.len() > 1 { + Some(args[1].as_string::()) } else { - ",".to_string() + None }; - - let kv_delim = if args.len() > 2 { - extract_delimiter_from_string_array(&args[2])? + let kv_delim_array = if args.len() > 2 { + Some(args[2].as_string::()) } else { - ":".to_string() + None }; - // Process each row - let text_array = text_array - .as_any() - .downcast_ref::() - .ok_or_else(|| { - datafusion_common::DataFusionError::Internal( - "Expected StringArray for text argument".to_string(), - ) - })?; - let num_rows = text_array.len(); - let mut keys_builder = StringBuilder::new(); - let mut values_builder = StringBuilder::new(); - let mut offsets: Vec = vec![0]; - let mut current_offset: i32 = 0; - let mut null_buffer = vec![true; num_rows]; + // Use field names matching map_type_from_key_value_types: "key" and "value" + let field_names = MapFieldNames { + entry: "entries".to_string(), + key: "key".to_string(), + value: "value".to_string(), + }; + let mut map_builder = MapBuilder::new( + Some(field_names), + StringBuilder::new(), + StringBuilder::new(), + ); for row_idx in 0..num_rows { - if text_array.is_null(row_idx) { - null_buffer[row_idx] = false; - offsets.push(current_offset); + if text_array.is_null(row_idx) + || pair_delim_array.is_some_and(|a| a.is_null(row_idx)) + || kv_delim_array.is_some_and(|a| a.is_null(row_idx)) + { + map_builder.append(false)?; continue; } + // Per-row delimiter extraction + let pair_delim = pair_delim_array.map_or(",", |a| a.value(row_idx)); + let kv_delim = kv_delim_array.map_or(":", |a| a.value(row_idx)); + let text = text_array.value(row_idx); if text.is_empty() { // Empty string -> map with empty key and NULL value (Spark behavior) - keys_builder.append_value(""); - values_builder.append_null(); - current_offset += 1; - offsets.push(current_offset); + map_builder.keys().append_value(""); + map_builder.values().append_null(); + map_builder.append(true)?; continue; } - let mut count = 0; - // Split text into key-value pairs using pair_delim. - // Example: "a:1,b:2" with pair_delim="," -> ["a:1", "b:2"] - for pair in text.split(&pair_delim) { - // Skip empty pairs (e.g., from "a:1,,b:2" -> ["a:1", "", "b:2"]) + let mut seen_keys = HashSet::new(); + for pair in text.split(pair_delim) { if pair.is_empty() { continue; } - // Split each pair into key and value using kv_delim. - // splitn(2, ...) ensures we only split on the FIRST delimiter. - // Example: "a:1:2" with kv_delim=":" -> ["a", "1:2"] (value keeps extra colons) - // - // kv[0] = key (always present) - // kv[1] = value (may not exist if no delimiter found) - // - // Examples: - // "a:1" -> kv = ["a", "1"] -> key="a", value=Some("1") - // "a" -> kv = ["a"] -> key="a", value=None - // "a:" -> kv = ["a", ""] -> key="a", value=Some("") - // ":1" -> kv = ["", "1"] -> key="", value=Some("1") - let mut kv_iter = pair.splitn(2, &kv_delim); + let mut kv_iter = pair.splitn(2, kv_delim); let key = kv_iter.next().unwrap_or(""); let value = kv_iter.next(); - keys_builder.append_value(key); - if let Some(v) = value { - values_builder.append_value(v); - } else { - values_builder.append_null(); + // EXCEPTION policy: error on duplicate keys (Spark 3.0+ default) + if !seen_keys.insert(key) { + return exec_err!( + "Duplicate map key '{key}' was found, please check the input data. \ + If you want to remove the duplicates, you can set \ + spark.sql.mapKeyDedupPolicy to LAST_WIN" + ); } - count += 1; - } - current_offset += count; - offsets.push(current_offset); + map_builder.keys().append_value(key); + match value { + Some(v) => map_builder.values().append_value(v), + None => map_builder.values().append_null(), + } + } + map_builder.append(true)?; } - let keys_array: ArrayRef = Arc::new(keys_builder.finish()); - let values_array: ArrayRef = Arc::new(values_builder.finish()); - - // Create null buffer - let null_buffer = NullBuffer::from(null_buffer); - - map_from_keys_values_offsets_nulls( - &keys_array, - &values_array, - &offsets, - &offsets, - Some(&null_buffer), - Some(&null_buffer), - ) -} - -/// Extract delimiter value from [`StringArray`]. -fn extract_delimiter_from_string_array(array: &ArrayRef) -> Result { - let string_array = array - .as_any() - .downcast_ref::() - .ok_or_else(|| { - datafusion_common::DataFusionError::Internal( - "Expected StringArray for delimiter".to_string(), - ) - })?; - - assert!( - !string_array.is_empty(), - "Delimiter array should not be empty" - ); - - // In columnar execution, scalar delimiter is expanded to array to match batch size. - // All elements are the same, so we just take the first element. - Ok(string_array.value(0).to_string()) + Ok(Arc::new(map_builder.finish())) } #[cfg(test)] mod tests { use super::*; - use arrow::array::MapArray; - - #[test] - fn test_extract_delimiter_from_string_array() { - // Normal case - single element - let delim: ArrayRef = Arc::new(StringArray::from(vec!["&"])); - let result = extract_delimiter_from_string_array(&delim).unwrap(); - assert_eq!(result, "&"); - - // Multi-char delimiter - let delim: ArrayRef = Arc::new(StringArray::from(vec!["&&"])); - let result = extract_delimiter_from_string_array(&delim).unwrap(); - assert_eq!(result, "&&"); - - // Expanded scalar case - multiple elements (all same value). - // This happens when the scalar delimiter is expanded to match batch size - let delim: ArrayRef = Arc::new(StringArray::from(vec!["=", "=", "="])); - let result = extract_delimiter_from_string_array(&delim).unwrap(); - assert_eq!(result, "="); - } + use arrow::array::{MapArray, StringArray}; // Table-driven tests for str_to_map // Test cases derived from Spark ComplexTypeSuite: @@ -357,16 +289,6 @@ mod tests { None, ], }, - // TODO: Spark 3.0+ defaults to EXCEPTION for duplicate keys. - // Current behavior: LAST_WIN (keeps last value, first 'a' dropped). - // See: spark.sql.mapKeyDedupPolicy - TestCase { - name: "duplicate keys (LAST_WIN)", - inputs: vec![Some("a:1,b:2,a:3")], - pair_delim: None, - kv_delim: None, - expected: vec![Some(vec![("b", Some("2")), ("a", Some("3"))])], - }, ]; for case in cases { @@ -418,7 +340,47 @@ mod tests { } } - /// Helper to extract keys and values from MapArray for assertions + #[test] + fn test_duplicate_keys_exception() { + let text: ArrayRef = Arc::new(StringArray::from(vec!["a:1,b:2,a:3"])); + let result = str_to_map_inner(&[text]); + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("Duplicate map key"), + "expected duplicate key error, got: {err_msg}" + ); + } + + #[test] + fn test_per_row_delimiters() { + // Each row has its own delimiters + let text: ArrayRef = + Arc::new(StringArray::from(vec![Some("a=1,b=2"), Some("x#9")])); + let pair_delim: ArrayRef = + Arc::new(StringArray::from(vec![Some(","), Some(",")])); + let kv_delim: ArrayRef = Arc::new(StringArray::from(vec![Some("="), Some("#")])); + + let result = str_to_map_inner(&[text, pair_delim, kv_delim]).unwrap(); + let map_array = result.as_any().downcast_ref::().unwrap(); + + assert_eq!(map_array.len(), 2); + + // Row 0: "a=1,b=2" with pair=",", kv="=" + let entries0 = get_map_entries(map_array, 0); + assert_eq!( + entries0, + vec![ + ("a".to_string(), Some("1".to_string())), + ("b".to_string(), Some("2".to_string())), + ] + ); + + // Row 1: "x#9" with pair=",", kv="#" + let entries1 = get_map_entries(map_array, 1); + assert_eq!(entries1, vec![("x".to_string(), Some("9".to_string()))]); + } + fn get_map_entries( map_array: &MapArray, row: usize, diff --git a/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt b/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt index c7eca75d1e4d4..671b8587fa189 100644 --- a/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt +++ b/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt @@ -21,7 +21,7 @@ # Test cases derived from Spark test("StringToMap"): # https://github.com/apache/spark/blob/v4.0.0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala#L525-L618 # -# Note: Duplicate key handling uses LAST_WIN policy (not EXCEPTION which is Spark default) +# Duplicate key handling uses EXCEPTION policy (Spark 3.0+ default) # s0: Basic test with default delimiters query ? @@ -65,20 +65,13 @@ SELECT str_to_map('a=1&b=2&c=3', '&', '='); ---- {a: 1, b: 2, c: 3} -# Duplicate keys: last wins (LAST_WIN policy) -query ? +# Duplicate keys: EXCEPTION policy (Spark 3.0+ default) +statement error +Duplicate map key SELECT str_to_map('a:1,b:2,a:3'); ----- -{b: 2, a: 3} # Additional tests (DataFusion-specific) -# Alias str_to_map -query ? -SELECT str_to_map('a:1,b:2'); ----- -{a: 1, b: 2} - # NULL input returns NULL query ? SELECT str_to_map(NULL, ',', ':'); From fc246875f3908cc473e42125c7ba88a04a76eab3 Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Fri, 6 Feb 2026 16:42:31 +0800 Subject: [PATCH 22/27] refactor: use as_string_array helpers and support Utf8/LargeUtf8/Utf8View Dispatch with explicit type matching per arg count (like parse_url), using datafusion_common::cast helpers instead of AsArray trait. Co-Authored-By: Claude Opus 4.6 --- .../spark/src/function/map/str_to_map.rs | 96 +++++++++++++++---- 1 file changed, 77 insertions(+), 19 deletions(-) diff --git a/datafusion/spark/src/function/map/str_to_map.rs b/datafusion/spark/src/function/map/str_to_map.rs index 54feebd344b25..6d0a2ec39a3b3 100644 --- a/datafusion/spark/src/function/map/str_to_map.rs +++ b/datafusion/spark/src/function/map/str_to_map.rs @@ -19,8 +19,9 @@ use std::any::Any; use std::collections::HashSet; use std::sync::Arc; -use arrow::array::{Array, ArrayRef, AsArray, MapBuilder, MapFieldNames, StringBuilder}; +use arrow::array::{ArrayRef, MapBuilder, MapFieldNames, StringBuilder, StringArrayType}; use arrow::datatypes::{DataType, Field, FieldRef}; +use datafusion_common::cast::{as_large_string_array, as_string_array, as_string_view_array}; use datafusion_common::{Result, exec_err, internal_err}; use datafusion_expr::{ ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, @@ -103,24 +104,81 @@ impl ScalarUDFImpl for SparkStrToMap { } fn str_to_map_inner(args: &[ArrayRef]) -> Result { - assert!( - !args.is_empty() && args.len() <= 3, - "str_to_map expects 1-3 arguments, got {}", - args.len() - ); - - let text_array = args[0].as_string::(); - let pair_delim_array = if args.len() > 1 { - Some(args[1].as_string::()) - } else { - None - }; - let kv_delim_array = if args.len() > 2 { - Some(args[2].as_string::()) - } else { - None - }; + match args.len() { + 1 => match args[0].data_type() { + DataType::Utf8 => { + str_to_map_impl(as_string_array(&args[0])?, None, None) + } + DataType::LargeUtf8 => { + str_to_map_impl(as_large_string_array(&args[0])?, None, None) + } + DataType::Utf8View => { + str_to_map_impl(as_string_view_array(&args[0])?, None, None) + } + other => exec_err!( + "Unsupported data type {other:?} for str_to_map, \ + expected Utf8, LargeUtf8, or Utf8View" + ), + }, + 2 => match (args[0].data_type(), args[1].data_type()) { + (DataType::Utf8, DataType::Utf8) => str_to_map_impl( + as_string_array(&args[0])?, + Some(as_string_array(&args[1])?), + None, + ), + (DataType::LargeUtf8, DataType::LargeUtf8) => str_to_map_impl( + as_large_string_array(&args[0])?, + Some(as_large_string_array(&args[1])?), + None, + ), + (DataType::Utf8View, DataType::Utf8View) => str_to_map_impl( + as_string_view_array(&args[0])?, + Some(as_string_view_array(&args[1])?), + None, + ), + (t1, t2) => exec_err!( + "Unsupported data types ({t1:?}, {t2:?}) for str_to_map, \ + expected matching Utf8, LargeUtf8, or Utf8View" + ), + }, + 3 => match ( + args[0].data_type(), + args[1].data_type(), + args[2].data_type(), + ) { + (DataType::Utf8, DataType::Utf8, DataType::Utf8) => str_to_map_impl( + as_string_array(&args[0])?, + Some(as_string_array(&args[1])?), + Some(as_string_array(&args[2])?), + ), + (DataType::LargeUtf8, DataType::LargeUtf8, DataType::LargeUtf8) => { + str_to_map_impl( + as_large_string_array(&args[0])?, + Some(as_large_string_array(&args[1])?), + Some(as_large_string_array(&args[2])?), + ) + } + (DataType::Utf8View, DataType::Utf8View, DataType::Utf8View) => { + str_to_map_impl( + as_string_view_array(&args[0])?, + Some(as_string_view_array(&args[1])?), + Some(as_string_view_array(&args[2])?), + ) + } + (t1, t2, t3) => exec_err!( + "Unsupported data types ({t1:?}, {t2:?}, {t3:?}) for str_to_map, \ + expected matching Utf8, LargeUtf8, or Utf8View" + ), + }, + n => exec_err!("str_to_map expects 1-3 arguments, got {n}"), + } +} +fn str_to_map_impl<'a, V: StringArrayType<'a> + Copy>( + text_array: V, + pair_delim_array: Option, + kv_delim_array: Option, +) -> Result { let num_rows = text_array.len(); // Use field names matching map_type_from_key_value_types: "key" and "value" let field_names = MapFieldNames { @@ -190,7 +248,7 @@ fn str_to_map_inner(args: &[ArrayRef]) -> Result { #[cfg(test)] mod tests { use super::*; - use arrow::array::{MapArray, StringArray}; + use arrow::array::{Array, MapArray, StringArray}; // Table-driven tests for str_to_map // Test cases derived from Spark ComplexTypeSuite: From 9c3093e3802bce34ec5bd0c3958ce174b79a242e Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Fri, 6 Feb 2026 16:43:04 +0800 Subject: [PATCH 23/27] test: add per-row delimiter SLT test for str_to_map Addresses review comment: delimiters can vary per row when passed as columns rather than literals. Co-Authored-By: Claude Opus 4.6 --- .../sqllogictest/test_files/spark/map/str_to_map.slt | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt b/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt index 671b8587fa189..7c7ea1677e0e9 100644 --- a/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt +++ b/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt @@ -104,4 +104,12 @@ SELECT str_to_map(col, ',', '=') FROM (VALUES ('a=1,b=2'), ('x=9'), (NULL)) AS t ---- {a: 1, b: 2} {x: 9} +NULL + +# Per-row delimiters: each row can have different delimiters +query ? +SELECT str_to_map(col1, col2, col3) FROM (VALUES ('a=1,b=2', ',', '='), ('x#9', ',', '#'), (NULL, ',', '=')) AS t(col1, col2, col3); +---- +{a: 1, b: 2} +{x: 9} NULL \ No newline at end of file From 61dd3c1cff39f18cd324e65dd56229300b195a88 Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Fri, 6 Feb 2026 16:56:00 +0800 Subject: [PATCH 24/27] refactor: use NullBuffer::union for precomputed null handling in str_to_map Replace per-row is_null() checks with a precomputed combined NullBuffer using bitmap-level AND via NullBuffer::union, as suggested in PR review. Co-Authored-By: Claude Opus 4.6 --- .../spark/src/function/map/str_to_map.rs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/datafusion/spark/src/function/map/str_to_map.rs b/datafusion/spark/src/function/map/str_to_map.rs index 6d0a2ec39a3b3..7f8e1acd99a2d 100644 --- a/datafusion/spark/src/function/map/str_to_map.rs +++ b/datafusion/spark/src/function/map/str_to_map.rs @@ -19,7 +19,8 @@ use std::any::Any; use std::collections::HashSet; use std::sync::Arc; -use arrow::array::{ArrayRef, MapBuilder, MapFieldNames, StringBuilder, StringArrayType}; +use arrow::array::{Array, ArrayRef, MapBuilder, MapFieldNames, StringBuilder, StringArrayType}; +use arrow::buffer::NullBuffer; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::cast::{as_large_string_array, as_string_array, as_string_view_array}; use datafusion_common::{Result, exec_err, internal_err}; @@ -180,6 +181,17 @@ fn str_to_map_impl<'a, V: StringArrayType<'a> + Copy>( kv_delim_array: Option, ) -> Result { let num_rows = text_array.len(); + + // Precompute combined null buffer from all input arrays. + // NullBuffer::union performs a bitmap-level AND, which is more efficient + // than checking per-row nullability inline. + let text_nulls = text_array.nulls().cloned(); + let pair_nulls = pair_delim_array.and_then(|a| a.nulls().cloned()); + let kv_nulls = kv_delim_array.and_then(|a| a.nulls().cloned()); + let combined_nulls = [text_nulls.as_ref(), pair_nulls.as_ref(), kv_nulls.as_ref()] + .into_iter() + .fold(None, |acc, nulls| NullBuffer::union(acc.as_ref(), nulls)); + // Use field names matching map_type_from_key_value_types: "key" and "value" let field_names = MapFieldNames { entry: "entries".to_string(), @@ -193,10 +205,7 @@ fn str_to_map_impl<'a, V: StringArrayType<'a> + Copy>( ); for row_idx in 0..num_rows { - if text_array.is_null(row_idx) - || pair_delim_array.is_some_and(|a| a.is_null(row_idx)) - || kv_delim_array.is_some_and(|a| a.is_null(row_idx)) - { + if combined_nulls.as_ref().is_some_and(|n| n.is_null(row_idx)) { map_builder.append(false)?; continue; } From a1f54ee40d86a39718387f1ee40d67e438070a63 Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Fri, 6 Feb 2026 19:27:21 +0800 Subject: [PATCH 25/27] refactor: reuse HashSet across rows to avoid per-row heap allocation Move seen_keys HashSet outside the row loop and clear() it each iteration, reusing the backing allocation instead of allocating a new HashSet per row. --- datafusion/spark/src/function/map/str_to_map.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/spark/src/function/map/str_to_map.rs b/datafusion/spark/src/function/map/str_to_map.rs index 7f8e1acd99a2d..91311ef1ce7e2 100644 --- a/datafusion/spark/src/function/map/str_to_map.rs +++ b/datafusion/spark/src/function/map/str_to_map.rs @@ -204,6 +204,7 @@ fn str_to_map_impl<'a, V: StringArrayType<'a> + Copy>( StringBuilder::new(), ); + let mut seen_keys = HashSet::new(); for row_idx in 0..num_rows { if combined_nulls.as_ref().is_some_and(|n| n.is_null(row_idx)) { map_builder.append(false)?; @@ -223,7 +224,7 @@ fn str_to_map_impl<'a, V: StringArrayType<'a> + Copy>( continue; } - let mut seen_keys = HashSet::new(); + seen_keys.clear(); for pair in text.split(pair_delim) { if pair.is_empty() { continue; From bbbe517bb9064fb256c69f67648d5b659ba3fcc6 Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Fri, 6 Feb 2026 19:41:15 +0800 Subject: [PATCH 26/27] refactor: move Rust tests to SLT, extract delimiter constants, match Spark error message - Remove redundant Rust unit tests (all covered by SLT) - Extract DEFAULT_PAIR_DELIM and DEFAULT_KV_DELIM constants - Match Spark's exact DUPLICATED_MAP_KEY error message - Add TODO for configurable mapKeyDedupPolicy (LAST_WIN) in follow-up PR --- .../spark/src/function/map/str_to_map.rs | 243 +----------------- .../test_files/spark/map/str_to_map.slt | 1 + 2 files changed, 12 insertions(+), 232 deletions(-) diff --git a/datafusion/spark/src/function/map/str_to_map.rs b/datafusion/spark/src/function/map/str_to_map.rs index 91311ef1ce7e2..dd51b3a71cbaa 100644 --- a/datafusion/spark/src/function/map/str_to_map.rs +++ b/datafusion/spark/src/function/map/str_to_map.rs @@ -31,6 +31,9 @@ use datafusion_expr::{ use crate::function::map::utils::map_type_from_key_value_types; +const DEFAULT_PAIR_DELIM: &str = ","; +const DEFAULT_KV_DELIM: &str = ":"; + /// Spark-compatible `str_to_map` expression /// /// @@ -45,6 +48,8 @@ use crate::function::map::utils::map_type_from_key_value_types; /// Uses EXCEPTION behavior (Spark 3.0+ default): errors on duplicate keys. /// See `spark.sql.mapKeyDedupPolicy`: /// +/// +/// TODO: Support configurable `spark.sql.mapKeyDedupPolicy` (LAST_WIN) in a follow-up PR. #[derive(Debug, PartialEq, Eq, Hash)] pub struct SparkStrToMap { signature: Signature, @@ -212,8 +217,8 @@ fn str_to_map_impl<'a, V: StringArrayType<'a> + Copy>( } // Per-row delimiter extraction - let pair_delim = pair_delim_array.map_or(",", |a| a.value(row_idx)); - let kv_delim = kv_delim_array.map_or(":", |a| a.value(row_idx)); + let pair_delim = pair_delim_array.map_or(DEFAULT_PAIR_DELIM, |a| a.value(row_idx)); + let kv_delim = kv_delim_array.map_or(DEFAULT_KV_DELIM, |a| a.value(row_idx)); let text = text_array.value(row_idx); if text.is_empty() { @@ -234,12 +239,14 @@ fn str_to_map_impl<'a, V: StringArrayType<'a> + Copy>( let key = kv_iter.next().unwrap_or(""); let value = kv_iter.next(); + // TODO: Support LAST_WIN policy via spark.sql.mapKeyDedupPolicy config // EXCEPTION policy: error on duplicate keys (Spark 3.0+ default) if !seen_keys.insert(key) { return exec_err!( "Duplicate map key '{key}' was found, please check the input data. \ - If you want to remove the duplicates, you can set \ - spark.sql.mapKeyDedupPolicy to LAST_WIN" + If you want to remove the duplicated keys, you can set \ + spark.sql.mapKeyDedupPolicy to \"LAST_WIN\" so that the key \ + inserted at last takes precedence." ); } @@ -254,231 +261,3 @@ fn str_to_map_impl<'a, V: StringArrayType<'a> + Copy>( Ok(Arc::new(map_builder.finish())) } - -#[cfg(test)] -mod tests { - use super::*; - use arrow::array::{Array, MapArray, StringArray}; - - // Table-driven tests for str_to_map - // Test cases derived from Spark ComplexTypeSuite: - // https://github.com/apache/spark/blob/v4.0.0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala#L525-L618 - #[test] - fn test_str_to_map_cases() { - struct TestCase { - name: &'static str, - inputs: Vec>, - pair_delim: Option<&'static str>, - kv_delim: Option<&'static str>, - expected: Vec)>>>, - } - - let cases = vec![ - TestCase { - name: "s0: basic default delimiters", - inputs: vec![Some("a:1,b:2,c:3")], - pair_delim: None, - kv_delim: None, - expected: vec![Some(vec![ - ("a", Some("1")), - ("b", Some("2")), - ("c", Some("3")), - ])], - }, - TestCase { - name: "s1: preserve spaces in values", - inputs: vec![Some("a: ,b:2")], - pair_delim: None, - kv_delim: None, - expected: vec![Some(vec![("a", Some(" ")), ("b", Some("2"))])], - }, - TestCase { - name: "s2: custom kv delimiter '='", - inputs: vec![Some("a=1,b=2,c=3")], - pair_delim: Some(","), - kv_delim: Some("="), - expected: vec![Some(vec![ - ("a", Some("1")), - ("b", Some("2")), - ("c", Some("3")), - ])], - }, - TestCase { - name: "s3: empty string", - inputs: vec![Some("")], - pair_delim: Some(","), - kv_delim: Some("="), - expected: vec![Some(vec![("", None)])], - }, - TestCase { - name: "s4: custom pair delimiter '_'", - inputs: vec![Some("a:1_b:2_c:3")], - pair_delim: Some("_"), - kv_delim: Some(":"), - expected: vec![Some(vec![ - ("a", Some("1")), - ("b", Some("2")), - ("c", Some("3")), - ])], - }, - TestCase { - name: "s5: single key no value", - inputs: vec![Some("a")], - pair_delim: None, - kv_delim: None, - expected: vec![Some(vec![("a", None)])], - }, - TestCase { - name: "s6: custom delimiters '&' and '='", - inputs: vec![Some("a=1&b=2&c=3")], - pair_delim: Some("&"), - kv_delim: Some("="), - expected: vec![Some(vec![ - ("a", Some("1")), - ("b", Some("2")), - ("c", Some("3")), - ])], - }, - TestCase { - name: "null input returns null", - inputs: vec![None], - pair_delim: None, - kv_delim: None, - expected: vec![None], - }, - TestCase { - name: "multi-row", - inputs: vec![Some("a:1,b:2"), Some("x:9"), None], - pair_delim: None, - kv_delim: None, - expected: vec![ - Some(vec![("a", Some("1")), ("b", Some("2"))]), - Some(vec![("x", Some("9"))]), - None, - ], - }, - ]; - - for case in cases { - let text: ArrayRef = Arc::new(StringArray::from(case.inputs)); - let args: Vec = match (case.pair_delim, case.kv_delim) { - (Some(p), Some(k)) => vec![ - text.clone(), - Arc::new(StringArray::from(vec![p; text.len()])), - Arc::new(StringArray::from(vec![k; text.len()])), - ], - _ => vec![text], - }; - - let result = str_to_map_inner(&args).unwrap(); - let map_array = result.as_any().downcast_ref::().unwrap(); - - assert_eq!(map_array.len(), case.expected.len(), "case: {}", case.name); - - for (row_idx, expected_row) in case.expected.iter().enumerate() { - match expected_row { - None => { - assert!( - map_array.is_null(row_idx), - "case: {} row {} expected NULL", - case.name, - row_idx - ); - } - Some(expected_entries) => { - assert!( - !map_array.is_null(row_idx), - "case: {} row {} unexpected NULL", - case.name, - row_idx - ); - let entries = get_map_entries(map_array, row_idx); - let expected: Vec<(String, Option)> = expected_entries - .iter() - .map(|(k, v)| (k.to_string(), v.map(|s| s.to_string()))) - .collect(); - assert_eq!( - entries, expected, - "case: {} row {}", - case.name, row_idx - ); - } - } - } - } - } - - #[test] - fn test_duplicate_keys_exception() { - let text: ArrayRef = Arc::new(StringArray::from(vec!["a:1,b:2,a:3"])); - let result = str_to_map_inner(&[text]); - assert!(result.is_err()); - let err_msg = result.unwrap_err().to_string(); - assert!( - err_msg.contains("Duplicate map key"), - "expected duplicate key error, got: {err_msg}" - ); - } - - #[test] - fn test_per_row_delimiters() { - // Each row has its own delimiters - let text: ArrayRef = - Arc::new(StringArray::from(vec![Some("a=1,b=2"), Some("x#9")])); - let pair_delim: ArrayRef = - Arc::new(StringArray::from(vec![Some(","), Some(",")])); - let kv_delim: ArrayRef = Arc::new(StringArray::from(vec![Some("="), Some("#")])); - - let result = str_to_map_inner(&[text, pair_delim, kv_delim]).unwrap(); - let map_array = result.as_any().downcast_ref::().unwrap(); - - assert_eq!(map_array.len(), 2); - - // Row 0: "a=1,b=2" with pair=",", kv="=" - let entries0 = get_map_entries(map_array, 0); - assert_eq!( - entries0, - vec![ - ("a".to_string(), Some("1".to_string())), - ("b".to_string(), Some("2".to_string())), - ] - ); - - // Row 1: "x#9" with pair=",", kv="#" - let entries1 = get_map_entries(map_array, 1); - assert_eq!(entries1, vec![("x".to_string(), Some("9".to_string()))]); - } - - fn get_map_entries( - map_array: &MapArray, - row: usize, - ) -> Vec<(String, Option)> { - if map_array.is_null(row) { - return vec![]; - } - let start = map_array.value_offsets()[row] as usize; - let end = map_array.value_offsets()[row + 1] as usize; - let keys = map_array - .keys() - .as_any() - .downcast_ref::() - .unwrap(); - let values = map_array - .values() - .as_any() - .downcast_ref::() - .unwrap(); - - (start..end) - .map(|i| { - let key = keys.value(i).to_string(); - let value = if values.is_null(i) { - None - } else { - Some(values.value(i).to_string()) - }; - (key, value) - }) - .collect() - } -} diff --git a/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt b/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt index 7c7ea1677e0e9..dbe508dfca468 100644 --- a/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt +++ b/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt @@ -66,6 +66,7 @@ SELECT str_to_map('a=1&b=2&c=3', '&', '='); {a: 1, b: 2, c: 3} # Duplicate keys: EXCEPTION policy (Spark 3.0+ default) +# TODO: Add LAST_WIN policy tests when spark.sql.mapKeyDedupPolicy config is supported statement error Duplicate map key SELECT str_to_map('a:1,b:2,a:3'); From 1e44dfabc7f50048f1da4ee6f2c9ba87040a7bac Mon Sep 17 00:00:00 2001 From: Eric Chang Date: Fri, 6 Feb 2026 19:52:50 +0800 Subject: [PATCH 27/27] chore: remove redundant duplicate key comment from SLT header Already documented at the duplicate key test case section. --- datafusion/spark/src/function/map/str_to_map.rs | 15 +++++++++------ .../test_files/spark/map/str_to_map.slt | 2 -- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/datafusion/spark/src/function/map/str_to_map.rs b/datafusion/spark/src/function/map/str_to_map.rs index dd51b3a71cbaa..b722fb7abd6b2 100644 --- a/datafusion/spark/src/function/map/str_to_map.rs +++ b/datafusion/spark/src/function/map/str_to_map.rs @@ -19,10 +19,14 @@ use std::any::Any; use std::collections::HashSet; use std::sync::Arc; -use arrow::array::{Array, ArrayRef, MapBuilder, MapFieldNames, StringBuilder, StringArrayType}; +use arrow::array::{ + Array, ArrayRef, MapBuilder, MapFieldNames, StringArrayType, StringBuilder, +}; use arrow::buffer::NullBuffer; use arrow::datatypes::{DataType, Field, FieldRef}; -use datafusion_common::cast::{as_large_string_array, as_string_array, as_string_view_array}; +use datafusion_common::cast::{ + as_large_string_array, as_string_array, as_string_view_array, +}; use datafusion_common::{Result, exec_err, internal_err}; use datafusion_expr::{ ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, @@ -112,9 +116,7 @@ impl ScalarUDFImpl for SparkStrToMap { fn str_to_map_inner(args: &[ArrayRef]) -> Result { match args.len() { 1 => match args[0].data_type() { - DataType::Utf8 => { - str_to_map_impl(as_string_array(&args[0])?, None, None) - } + DataType::Utf8 => str_to_map_impl(as_string_array(&args[0])?, None, None), DataType::LargeUtf8 => { str_to_map_impl(as_large_string_array(&args[0])?, None, None) } @@ -217,7 +219,8 @@ fn str_to_map_impl<'a, V: StringArrayType<'a> + Copy>( } // Per-row delimiter extraction - let pair_delim = pair_delim_array.map_or(DEFAULT_PAIR_DELIM, |a| a.value(row_idx)); + let pair_delim = + pair_delim_array.map_or(DEFAULT_PAIR_DELIM, |a| a.value(row_idx)); let kv_delim = kv_delim_array.map_or(DEFAULT_KV_DELIM, |a| a.value(row_idx)); let text = text_array.value(row_idx); diff --git a/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt b/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt index dbe508dfca468..30d1672aef0ae 100644 --- a/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt +++ b/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt @@ -20,8 +20,6 @@ # # Test cases derived from Spark test("StringToMap"): # https://github.com/apache/spark/blob/v4.0.0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala#L525-L618 -# -# Duplicate key handling uses EXCEPTION policy (Spark 3.0+ default) # s0: Basic test with default delimiters query ?