Skip to content
90 changes: 63 additions & 27 deletions mssql_python/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2452,7 +2452,18 @@ def nextset(self) -> Union[bool, None]:
return True

def _bulkcopy(
self, table_name: str, data: Iterable[Union[Tuple, List]], **kwargs
self,
table_name: str,
data: Iterable[Union[Tuple, List]],
batch_size: int = 0,
timeout: int = 30,
column_mappings: Optional[Union[List[str], List[Tuple[int, str]]]] = None,
keep_identity: bool = False,
check_constraints: bool = False,
table_lock: bool = False,
keep_nulls: bool = False,
fire_triggers: bool = False,
use_internal_transaction: bool = False,
): # pragma: no cover
"""
Perform bulk copy operation for high-performance data loading.
Expand All @@ -2471,20 +2482,38 @@ def _bulkcopy(
- The number of values in each row must match the number of columns
in the target table

**kwargs: Additional bulk copy options.
batch_size: Number of rows to send per batch. Default 0 uses server optimal.

timeout: Operation timeout in seconds. Default is 30.

column_mappings: Maps source data columns to target table column names.
Two formats supported:

Simple Format - List[str]:
List of destination column names in order. Position in list = source index.
Example: ['UserID', 'FirstName', 'Email']
Maps: index 0 → UserID, index 1 → FirstName, index 2 → Email

Advanced Format - List[Tuple[int, str]]:
Explicit index mapping. Allows skipping or reordering columns.
Each tuple is (source_index, target_column_name).
Example: [(0, 'UserID'), (1, 'FirstName'), (3, 'Email')]
Maps: index 0 → UserID, index 1 → FirstName, index 3 → Email (skips index 2)

When omitted: Columns are mapped by ordinal position (first data
column → first table column, second → second, etc.)

keep_identity: Preserve identity values from source data.

check_constraints: Check constraints during bulk copy.

column_mappings (List[Tuple[int, str]], optional):
Maps source data column indices to target table column names.
Each tuple is (source_index, target_column_name) where:
- source_index: 0-based index of the column in the source data
- target_column_name: Name of the target column in the database table
table_lock: Use table-level lock instead of row-level locks.

When omitted: Columns are mapped by ordinal position (first data
column → first table column, second → second, etc.)
keep_nulls: Preserve null values instead of using default values.

When specified: Only the mapped columns are inserted; unmapped
source columns are ignored, and unmapped target columns must
have default values or allow NULL.
fire_triggers: Fire insert triggers on the target table.

use_internal_transaction: Use an internal transaction for each batch.

Returns:
Dictionary with bulk copy results including:
Expand Down Expand Up @@ -2523,22 +2552,17 @@ def _bulkcopy(
f"data must be an iterable of tuples or lists, got non-iterable {type(data).__name__}"
)

# Extract and validate kwargs with defaults
batch_size = kwargs.get("batch_size", None)
timeout = kwargs.get("timeout", 30)

# Validate batch_size type and value (only if explicitly provided)
if batch_size is not None:
if not isinstance(batch_size, (int, float)):
raise TypeError(
f"batch_size must be a positive integer, got {type(batch_size).__name__}"
)
if batch_size <= 0:
raise ValueError(f"batch_size must be positive, got {batch_size}")
# Validate batch_size type and value (0 means server optimal)
if not isinstance(batch_size, int):
raise TypeError(
f"batch_size must be a non-negative integer, got {type(batch_size).__name__}"
)
if batch_size < 0:
raise ValueError(f"batch_size must be non-negative, got {batch_size}")

# Validate timeout type and value
if not isinstance(timeout, (int, float)):
raise TypeError(f"timeout must be a positive number, got {type(timeout).__name__}")
if not isinstance(timeout, int):
raise TypeError(f"timeout must be a positive integer, got {type(timeout).__name__}")
if timeout <= 0:
raise ValueError(f"timeout must be positive, got {timeout}")

Expand Down Expand Up @@ -2599,7 +2623,19 @@ def _bulkcopy(
pycore_connection = mssql_py_core.PyCoreConnection(pycore_context)
pycore_cursor = pycore_connection.cursor()

result = pycore_cursor.bulkcopy(table_name, iter(data), **kwargs)
result = pycore_cursor.bulkcopy(
table_name,
iter(data),
batch_size=batch_size,
timeout=timeout,
column_mappings=column_mappings,
keep_identity=keep_identity,
check_constraints=check_constraints,
table_lock=table_lock,
keep_nulls=keep_nulls,
fire_triggers=fire_triggers,
use_internal_transaction=use_internal_transaction,
)

return result

Expand Down
Loading