diff --git a/CHANGES/7218.bugfix b/CHANGES/7218.bugfix new file mode 100644 index 0000000000..229276c833 --- /dev/null +++ b/CHANGES/7218.bugfix @@ -0,0 +1 @@ +Implement atomic hashing and chunking in PulpExport \ No newline at end of file diff --git a/pulp_file/tests/functional/api/test_pulp_export.py b/pulp_file/tests/functional/api/test_pulp_export.py index 71e6381625..eb3f16cb51 100644 --- a/pulp_file/tests/functional/api/test_pulp_export.py +++ b/pulp_file/tests/functional/api/test_pulp_export.py @@ -1,4 +1,6 @@ import json +from pathlib import Path + import pytest import uuid @@ -420,3 +422,43 @@ def test_export_with_meta(pulpcore_bindings, pulp_export_factory, full_pulp_expo assert meta_json.get("purpose") == "export" # overridden field check assert meta_json.get("checksum_type") == "crc32" + + +@pytest.mark.parallel +def test_export_chunk_ordering_and_naming( + pulp_exporter_factory, + pulp_export_factory, + three_synced_repositories, +): + exporter = pulp_exporter_factory(repositories=[three_synced_repositories[0]]) + chunk_size_bytes = 100 + body = {"chunk_size": f"{chunk_size_bytes}B"} + export = pulp_export_factory(exporter, body) + + all_paths = [Path(p) for p in export.output_file_info.keys()] + tar_chunks = [p for p in all_paths if ".tar." in p.name] + + assert len(tar_chunks) > 1, f"Expected multiple chunks for {chunk_size_bytes}B limit." + + for index, path in enumerate(tar_chunks): + expected_suffix = f"{index:04d}" + + assert path.name.endswith(expected_suffix), f"Chunk {path} missing suffix {expected_suffix}" + assert path.exists(), f"Chunk file {path} was not found on disk." + + if index < len(tar_chunks) - 1: + assert path.stat().st_size == chunk_size_bytes + + toc_path = Path(export.toc_info["file"]) + with toc_path.open("r", encoding="utf-8") as f: + toc_data = json.load(f) + + toc_filenames = list(toc_data["files"].keys()) + expected_filenames = [p.name for p in tar_chunks] + + assert ( + toc_filenames == expected_filenames + ), f"TOC order mismatch.\nExpected: {expected_filenames}\nActual: {toc_filenames}" + + assert toc_data["meta"]["chunk_size"] == chunk_size_bytes + assert toc_data["meta"]["checksum_type"] == "crc32" diff --git a/pulpcore/app/tasks/export.py b/pulpcore/app/tasks/export.py index 418f6d622a..4bbca2ca09 100644 --- a/pulpcore/app/tasks/export.py +++ b/pulpcore/app/tasks/export.py @@ -2,7 +2,6 @@ import logging import os import os.path -import subprocess import tarfile from distutils.util import strtobool @@ -28,7 +27,7 @@ from pulpcore.app.models.content import ContentArtifact from pulpcore.app.serializers import PulpExportSerializer -from pulpcore.app.util import compute_file_hash, Crc32Hasher +from pulpcore.app.util import compute_file_hash, Crc32Hasher, HashingFileWriter from pulpcore.app.importexport import ( export_versions, export_artifacts, @@ -394,55 +393,24 @@ def pulp_export(exporter_pk, params): if not path.is_dir(): path.mkdir(mode=0o775, parents=True) - rslts = {} - if the_export.validated_chunk_size: - # write it into chunks - with subprocess.Popen( - [ - "split", - "-a", - "4", - "-b", - str(the_export.validated_chunk_size), - "-d", - "-", - tarfile_fp + ".", - ], - stdin=subprocess.PIPE, - ) as split_process: - try: - with tarfile.open( - tarfile_fp, - "w|", - fileobj=split_process.stdin, - ) as tar: - _do_export(pulp_exporter, tar, the_export) - except Exception: - # no matter what went wrong, we can't trust the files we (may have) created. - # Delete the ones we can find and pass the problem up. - for pathname in glob(tarfile_fp + ".*"): - os.remove(pathname) - raise - # compute the hashes - paths = sorted([str(Path(p)) for p in glob(tarfile_fp + ".*")]) - for a_file in paths: - a_hash = compute_file_hash(a_file, hasher=hasher()) - rslts[a_file] = a_hash + writer = HashingFileWriter( + base_path=tarfile_fp, + chunk_size=the_export.validated_chunk_size or 0, + hasher_cls=hasher, + ) - else: - # write into the file - try: - with tarfile.open(tarfile_fp, "w") as tar: + try: + with writer: + with tarfile.open(fileobj=writer, mode="w|") as tar: _do_export(pulp_exporter, tar, the_export) - except Exception: - # no matter what went wrong, we can't trust the file we created. - # Delete it if it exists and pass the problem up. - if os.path.exists(tarfile_fp): - os.remove(tarfile_fp) - raise - # compute the hash - tarfile_hash = compute_file_hash(tarfile_fp, hasher=hasher()) - rslts[tarfile_fp] = tarfile_hash + except Exception: + # no matter what went wrong, we can't trust the files we (may have) created. + # Delete the ones we can find and pass the problem up. + for pathname in glob(tarfile_fp + ".*"): + os.remove(pathname) + raise + + rslts = writer.results # store the outputfile/hash info the_export.output_file_info = rslts diff --git a/pulpcore/app/util.py b/pulpcore/app/util.py index 919cc5ec47..98e8bc41a4 100644 --- a/pulpcore/app/util.py +++ b/pulpcore/app/util.py @@ -3,6 +3,11 @@ import os import socket import tempfile +from io import RawIOBase +from pathlib import Path +from types import TracebackType +from typing import Self, IO, Any + import gnupg from functools import lru_cache @@ -649,3 +654,153 @@ def normalize_http_status(status): return "5xx" else: return "" + + +class HashingFileWriter(RawIOBase): + """ + A file-like object that handles writing data to disk with simultaneous + hashing. It supports both single-file writing and chunk-based splitting. + """ + + def __init__( + self, + base_path: str | Path, + hasher_cls: Any, + chunk_size: int = 0, + suffix_length: int = 4, + dir_mode: int = 0o775, + ) -> None: + """ + Args: + base_path: The target file path. + hasher_cls: The hashing class to use (e.g., hashlib.sha256). + chunk_size: Max bytes per file. 0 (or less) disables splitting. + suffix_length: Length of the numeric suffix for split files (e.g., .0001). + dir_mode: The octal file permissions to use when creating parent + directories (e.g., 0o775). + """ + super().__init__() + + self.base_path = Path(base_path) + self.chunk_size = max(0, chunk_size) + self.hasher_cls = hasher_cls + self.suffix_length = suffix_length + self.dir_mode = dir_mode + + self._file_index: int = 0 + self._current_file: IO[bytes] | None = None + self._current_hasher: Any | None = None + self._current_file_path: Path | None = None + + self._current_chunk_written: int = 0 + self._total_written: int = 0 + + # Maps file_path string -> hash digest + self.results: dict[str, str] = {} + + @property + def is_splitting(self) -> bool: + return self.chunk_size > 0 + + def _get_filename(self) -> Path: + """Determines the filename based on splitting mode.""" + if not self.is_splitting: + return self.base_path + + suffix = f".{self._file_index:0{self.suffix_length}d}" + return self.base_path.with_name(f"{self.base_path.name}{suffix}") + + def _rotate_file(self) -> None: + """Closes the current file and opens the next one.""" + self._close_current_file() + + self._current_file_path = self._get_filename() + self._current_file_path.parent.mkdir(parents=True, exist_ok=True, mode=self.dir_mode) + + self._current_file = self._current_file_path.open("wb") + self._current_hasher = self.hasher_cls() + self._current_chunk_written = 0 + + if self.is_splitting: + self._file_index += 1 + + def _close_current_file(self) -> None: + """Finalizes the current file and stores its hash.""" + if self._current_file: + self._current_file.close() + + if self._current_hasher and self._current_file_path: + self.results[str(self._current_file_path)] = self._current_hasher.hexdigest() + + self._current_file = None + self._current_hasher = None + + def write(self, data: bytes) -> int: + # Early exit for empty writes to prevent creating empty files + if not data: + return 0 + + if not self._current_file: + self._rotate_file() + + # If splitting is disabled, strict write without rotation logic + if not self.is_splitting: + assert self._current_file is not None + assert self._current_hasher is not None + self._current_file.write(data) + self._current_hasher.update(data) + self._total_written += len(data) + return len(data) + + # Splitting logic + buffer = memoryview(data) + bytes_remaining = len(buffer) + cursor = 0 + + while bytes_remaining > 0: + assert self._current_file is not None + assert self._current_hasher is not None + + space_left = self.chunk_size - self._current_chunk_written + + # If the current file is full, rotate immediately before writing more + if space_left <= 0: + self._rotate_file() + space_left = self.chunk_size + + to_write = min(bytes_remaining, space_left) + chunk = buffer[cursor : cursor + to_write] + + self._current_file.write(chunk) + self._current_hasher.update(chunk) + + self._current_chunk_written += to_write + self._total_written += to_write + cursor += to_write + bytes_remaining -= to_write + + return len(data) + + def writable(self) -> bool: + return True + + def tell(self) -> int: + return self._total_written + + def flush(self) -> None: + if self._current_file: + self._current_file.flush() + + def close(self) -> None: + self._close_current_file() + + def __enter__(self) -> Self: + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + self.close() diff --git a/pulpcore/tests/unit/test_util.py b/pulpcore/tests/unit/test_util.py index 747385dc92..0e4460fd8c 100644 --- a/pulpcore/tests/unit/test_util.py +++ b/pulpcore/tests/unit/test_util.py @@ -1,8 +1,13 @@ +import hashlib +import tempfile +import unittest +from pathlib import Path + import pytest from unittest import mock from pulpcore.app import models, util - +from pulpcore.app.util import HashingFileWriter pytestmark = pytest.mark.usefixtures("fake_domain") @@ -30,3 +35,133 @@ def test_get_view_name_for_model_not_found(monkeypatch): monkeypatch.setattr(util, "get_viewset_for_model", mock.Mock()) with pytest.raises(LookupError): util.get_view_name_for_model(mock.Mock(), "foo") + + +class TestHashingFileWriter(unittest.TestCase): + def setUp(self) -> None: + self.test_dir_obj = tempfile.TemporaryDirectory() + self.test_dir = Path(self.test_dir_obj.name) + self.base_path = self.test_dir / "export.tar" + + def tearDown(self) -> None: + self.test_dir_obj.cleanup() + + def test_chunk_size_zero_creates_single_file(self) -> None: + """Verify chunk_size=0 creates one file with no suffix.""" + data = b"x" * 1024 + + with HashingFileWriter(self.base_path, hasher_cls=hashlib.sha256, chunk_size=0) as writer: + writer.write(data) + + self.assertTrue(self.base_path.exists()) + self.assertFalse((self.base_path.parent / "export.tar.0000").exists()) + + self.assertEqual(self.base_path.read_bytes(), data) + expected_hash = hashlib.sha256(data).hexdigest() + self.assertEqual(writer.results[str(self.base_path)], expected_hash) + + def test_exact_boundary_split(self) -> None: + """ + If we write exactly chunk_size, we should have 1 file. + Only if we write 1 byte MORE should we get a second file. + """ + chunk = 10 + data = b"A" * 10 + + with HashingFileWriter( + self.base_path, hasher_cls=hashlib.sha256, chunk_size=chunk + ) as writer: + writer.write(data) + + # Should be exactly 1 file: .0000 + self.assertTrue((self.test_dir / "export.tar.0000").exists()) + self.assertFalse((self.test_dir / "export.tar.0001").exists()) + self.assertEqual((self.test_dir / "export.tar.0000").stat().st_size, 10) + + def test_overflow_boundary_split(self) -> None: + """Writing chunk_size + 1 bytes should create two files.""" + chunk = 10 + data = b"A" * 11 + + with HashingFileWriter( + self.base_path, hasher_cls=hashlib.sha256, chunk_size=chunk + ) as writer: + writer.write(data) + + f0 = self.test_dir / "export.tar.0000" + f1 = self.test_dir / "export.tar.0001" + + self.assertTrue(f0.exists()) + self.assertTrue(f1.exists()) + self.assertEqual(f0.stat().st_size, 10) + self.assertEqual(f1.stat().st_size, 1) + + def test_hasher_content(self) -> None: + """Ensure we can swap the hasher (e.g., md5).""" + data = b"check" + with HashingFileWriter(self.base_path, hasher_cls=hashlib.md5, chunk_size=0) as writer: + writer.write(data) + + expected = hashlib.md5(data).hexdigest() + self.assertEqual(writer.results[str(self.base_path)], expected) + + def test_nested_directory_creation(self) -> None: + """Ensure it creates parent directories if they don't exist.""" + nested_path = self.test_dir / "subdir" / "deep" / "archive.tar" + + with HashingFileWriter(nested_path, hasher_cls=hashlib.sha256, chunk_size=0) as writer: + writer.write(b"content") + + self.assertTrue(nested_path.exists()) + + def test_multiple_writes_cross_boundary(self) -> None: + """Verify multiple small writes correctly trigger file rotation.""" + chunk = 10 + # 3 writes of 4 bytes = 12 bytes total. Should create 2 files. + with HashingFileWriter( + self.base_path, hasher_cls=hashlib.sha256, chunk_size=chunk + ) as writer: + writer.write(b"aaaa") + writer.write(b"bbbb") + writer.write(b"cccc") + + f0, f1 = self.test_dir / "export.tar.0000", self.test_dir / "export.tar.0001" + self.assertEqual(f0.read_bytes(), b"aaaabbbbcc") + self.assertEqual(f1.read_bytes(), b"cc") + + def test_results_insertion_order(self) -> None: + """Verify the results dictionary preserves the order of file creation.""" + chunk = 5 + data = b"123456789012345" # 15 bytes, 3 chunks + with HashingFileWriter( + self.base_path, hasher_cls=hashlib.sha256, chunk_size=chunk + ) as writer: + writer.write(data) + + expected_keys = [ + str(self.base_path.with_name("export.tar.0000")), + str(self.base_path.with_name("export.tar.0001")), + str(self.base_path.with_name("export.tar.0002")), + ] + self.assertEqual(list(writer.results.keys()), expected_keys) + + def test_write_empty_bytes(self) -> None: + """Ensure writing empty bytes doesn't create a file or change state.""" + with HashingFileWriter(self.base_path, hasher_cls=hashlib.sha256, chunk_size=10) as writer: + writer.write(b"") + + # No file should be created if no data was ever actually written + self.assertEqual(len(writer.results), 0) + self.assertFalse(self.base_path.exists()) + + def test_large_chunk_rotation(self) -> None: + """Verify data much larger than chunk_size splits into many files.""" + chunk = 10 + data = b"X" * 35 # Should create 0000, 0001, 0002, 0003 + with HashingFileWriter( + self.base_path, hasher_cls=hashlib.sha256, chunk_size=chunk + ) as writer: + writer.write(data) + + self.assertEqual(len(writer.results), 4) + self.assertEqual((self.test_dir / "export.tar.0003").stat().st_size, 5)