From 14287e3f3651b0565b0c21cf5cc3fed44455bba5 Mon Sep 17 00:00:00 2001 From: Max Luebbering <2804731+le1nux@users.noreply.github.com> Date: Tue, 17 Feb 2026 11:41:04 +0100 Subject: [PATCH 1/2] refactor: improved the throughput of the tokenized file writer by using more efficient data routines --- .../tokenization/tokenized_file_writer.py | 59 +++++++++++++------ 1 file changed, 42 insertions(+), 17 deletions(-) diff --git a/src/modalities/dataloader/preprocessing/tokenization/tokenized_file_writer.py b/src/modalities/dataloader/preprocessing/tokenization/tokenized_file_writer.py index ed64488b6..d6c185bde 100644 --- a/src/modalities/dataloader/preprocessing/tokenization/tokenized_file_writer.py +++ b/src/modalities/dataloader/preprocessing/tokenization/tokenized_file_writer.py @@ -1,7 +1,6 @@ import math import os import pickle -from itertools import repeat from pathlib import Path from typing import BinaryIO @@ -82,30 +81,56 @@ def _write_index_segment(file_descriptor: BinaryIO, index_list: list[tuple[int, def _write_data_segment( file_descriptor: BinaryIO, token_data: list[np.ndarray], token_size_in_bytes: int, write_batch_size: int ) -> list[tuple[int, int]]: - def encoded_token_to_bytes(encoded_token: int, token_size_in_bytes: int) -> bytes: - # Converts an token_ids to its byte representation. - try: - token_bytes = encoded_token.to_bytes(token_size_in_bytes, byteorder="little", signed=False) - except OverflowError as e: - raise ValueError(f"Token {encoded_token} cannot be represented by {token_size_in_bytes} bytes.") from e - return token_bytes - - samples = [] - index_list = [] + # Fast path: vectorized cast + tobytes (no per-token Python work). + # Preserves little-endian unsigned representation and overflow checks. + + if token_size_in_bytes == 1: + dtype = np.dtype("u1") + elif token_size_in_bytes == 2: + dtype = np.dtype(" max_allowed: + raise ValueError( + f"Token values out of range for {token_size_in_bytes} bytes: " + f"min={min_val}, max={max_val}, allowed=[0, {max_allowed}]" + ) + # ---------------------------------------------------------------- + + # Cast to correct unsigned little-endian dtype + arr = np.asarray(arr, dtype=dtype, order="C") + sample_token_byte_string = arr.tobytes(order="C") + samples.append(sample_token_byte_string) index_list.append((curr_offset, len(sample_token_byte_string))) curr_offset += len(sample_token_byte_string) - if len(samples) % write_batch_size == 0: + + pending += 1 + if pending >= write_batch_size: file_descriptor.write(b"".join(samples)) - samples = [] + samples.clear() + pending = 0 + if len(samples) > 0: file_descriptor.write(b"".join(samples)) + return index_list @staticmethod From e97578ddbb71a786873fa34bb90dcee6bfafe184 Mon Sep 17 00:00:00 2001 From: Max Luebbering <2804731+le1nux@users.noreply.github.com> Date: Sun, 22 Feb 2026 11:15:25 +0100 Subject: [PATCH 2/2] feat: introduced enforce_enough_tokens_in_dataset for enabling a check if the dataset provides enough tokens. --- src/modalities/config/instantiation_models.py | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/modalities/config/instantiation_models.py b/src/modalities/config/instantiation_models.py index 19d99b627..fd7fd3b78 100644 --- a/src/modalities/config/instantiation_models.py +++ b/src/modalities/config/instantiation_models.py @@ -1,3 +1,4 @@ +import logging import os from pathlib import Path from typing import Annotated, Any, Optional @@ -27,6 +28,8 @@ from modalities.util import warn_rank_0 from modalities.utils.profilers.profilers import SteppableNoProfiler +logger = logging.getLogger(__name__) + class CudaEnvSettings(BaseModel): local_rank: Annotated[int, Field(strict=True, ge=0)] @@ -46,6 +49,7 @@ class ConsistencyEnforcement(BaseModel): enforce_last_step_logged: bool = True enforce_last_step_evaluated: bool = True enforce_last_step_checkpointed: bool = True + enforce_enough_tokens_in_dataset: bool = True class Intervals(BaseModel): @@ -192,15 +196,14 @@ def _check_last_step_checkpointed(self) -> "TrainingComponentsInstantiationModel @model_validator(mode="after") def _check_token_amount_in_dataset(self) -> "TrainingComponentsInstantiationModel": - if ( - len(self.train_dataset) * self.settings.step_profile.sequence_length - < self.settings.training_target.num_target_tokens - ): - raise ValueError( - "Not enough tokens in the dataset. " - f"Actual: {len(self.train_dataset) * self.settings.step_profile.sequence_length}, " - f"Expected: >={self.settings.training_target.num_target_tokens}" - ) + dataset_tokens = len(self.train_dataset) * self.settings.step_profile.sequence_length + expected_tokens = self.settings.training_target.num_target_tokens + if dataset_tokens < expected_tokens: + msg = f"Not enough tokens in dataset. Actual: {dataset_tokens}, Expected: >={expected_tokens}" + if self.settings.consistency_enforcement.enforce_enough_tokens_in_dataset: + raise ValueError(msg) + else: + logger.warning(msg) return self