diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index dd31250..1fd5a6c 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -13,7 +13,7 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - os: [ubuntu-latest, macos-latest] + os: [ubuntu-latest] python-version: ['3.9', '3.10', '3.11', '3.12'] fail-fast: false diff --git a/relink.py b/relink.py index 4778ad7..686cef2 100644 --- a/relink.py +++ b/relink.py @@ -10,6 +10,7 @@ import argparse import logging import time +from pathlib import Path DEFAULT_SOURCE_ROOT = "/glade/campaign/cesm/cesmdata/cseg/inputdata/" DEFAULT_TARGET_ROOT = ( @@ -34,7 +35,103 @@ def always(self, message, *args, **kwargs): logging.Logger.always = always -def find_owned_files_scandir(directory, user_uid): +def _handle_non_dir_entry(entry: os.DirEntry, user_uid: int): + """ + Check if a non-directory entry is owned by the user and should be processed. + + Args: + entry (os.DirEntry): A directory entry from os.scandir(). + user_uid (int): The UID of the user whose files to find. + + Returns: + str or None: The absolute path to the file if it's owned by the user + and is a regular file (not a symlink), otherwise None. + """ + # Is this even owned by the user? + if entry.stat(follow_symlinks=False).st_uid == user_uid: + + # Return if it's a file (not following symlinks) + if entry.is_file(follow_symlinks=False): + return entry.path + + # Log about skipping symlinks + if entry.is_symlink(): + logger.debug("Skipping symlink: %s", entry.path) + + return None + + +def _handle_non_dir_str(path: str, user_uid: int): + """ + Check if a non-directory string is owned by the user and should be processed. This should only + ever be needed if the user specified a file to process on the command line. Because we don't + expect users to process large numbers of files at once in this way, it's okay if this function + isn't performance-optimized. + + Args: + path (str): A filesystem path. + user_uid (int): The UID of the user whose files to find. + + Returns: + str or None: The absolute path to the file if it's owned by the user + and is a regular file (not a symlink), otherwise None. + """ + # Is this even owned by the user? + if os.stat(path, follow_symlinks=False).st_uid == user_uid: + + is_file = os.path.isfile(path) + is_symlink = os.path.islink(path) + + # Log about skipping symlinks + if is_symlink: + logger.debug("Skipping symlink: %s", path) + + # Return if it's a file (and not a symlink) + elif is_file: + return path + + return None + + +def handle_non_dir(var, user_uid): + """ + Check if a non-directory is owned by the user and should be processed. Passes var to a + helper function depending on its type. + + Args: + var (os.DirEntry or str): A directory entry from os.scandir(), or a string path. + user_uid (int): The UID of the user whose files to find. + + Returns: + str or None: The absolute path to the file if it's owned by the user + and is a regular file (not a symlink), otherwise None. + + Raises: + TypeError: If var is not a DirEntry-like object. + """ + + # Handle a variable of type str. + if isinstance(var, str): + file_path = _handle_non_dir_str(var, user_uid) + + # Handle a variable of type like os.DirEntry. + # Fall back to duck typing: If var has the required DirEntry methods and members, treat it as a + # DirEntry. This is necessary for this conditional to work with the MockDirEntry type used in + # testing. ("If it looks, walks, and quacks like a duck...") + elif isinstance(var, os.DirEntry) or all( + hasattr(var, m) for m in ["stat", "is_file", "is_symlink", "path"] + ): + file_path = _handle_non_dir_entry(var, user_uid) + + else: + raise TypeError( + f"Unsure how to handle non-directory variable of type {type(var)}" + ) + + return file_path + + +def find_owned_files_scandir(item, user_uid, inputdata_root=DEFAULT_SOURCE_ROOT): """ Efficiently find all files owned by a specific user using os.scandir(). @@ -42,53 +139,60 @@ def find_owned_files_scandir(directory, user_uid): information during directory traversal, reducing system calls. Args: - directory (str): The root directory to search. + item (str): The root directory to search, or the file to check. user_uid (int): The UID of the user whose files to find. + inputdata_root (str): The root of the directory tree containing CESM input data. Yields: str: Absolute paths to files owned by the user. + + Raises: + ValueError: If any file found is not under inputdata_root. """ try: - with os.scandir(directory) as entries: + with os.scandir(item) as entries: for entry in entries: try: - # Check if it's a file (not following symlinks) - if entry.is_file(follow_symlinks=False): - # Get stat info (cached by scandir, very efficient) - stat_info = entry.stat(follow_symlinks=False) - - if stat_info.st_uid == user_uid: - yield entry.path - # Recursively process directories (not following symlinks) - elif entry.is_dir(follow_symlinks=False): - yield from find_owned_files_scandir(entry.path, user_uid) + if entry.is_dir(follow_symlinks=False): + yield from find_owned_files_scandir( + entry.path, user_uid, inputdata_root + ) - # Skip symlinks - elif entry.is_symlink(): - logger.info("Skipping symlink: %s", entry.path) + # Things other than directories are handled separately + elif ( + entry_path := handle_non_dir(entry, user_uid) + ) is not None: + yield entry_path except (OSError, PermissionError) as e: - logger.debug("Error accessing %s: %s. Skipping.", entry.path, e) + logger.error("Error accessing %s: %s. Skipping.", entry.path, e) continue + except NotADirectoryError: + if (file_path := handle_non_dir(item, user_uid)) is not None: + yield file_path + except (OSError, PermissionError) as e: - logger.debug("Error accessing %s: %s. Skipping.", directory, e) + logger.error("Error accessing %s: %s. Skipping.", item, e) -def replace_files_with_symlinks(source_dir, target_dir, username, dry_run=False): +def replace_files_with_symlinks( + item_to_process, target_dir, username, inputdata_root=DEFAULT_SOURCE_ROOT, dry_run=False +): """ Finds files owned by a specific user in a source directory tree, deletes them, and replaces them with symbolic links to the same relative path in a target directory tree. Args: - source_dir (str): The root of the directory tree to search for files. + item_to_process (str): The root directory to search, or the file to process. target_dir (str): The root of the directory tree containing the new files. + inputdata_root (str): The root of the directory tree containing CESM input data. username (str): The name of the user whose files will be processed. dry_run (bool): If True, only show what would be done without making changes. """ - source_dir = os.path.abspath(source_dir) + item_to_process = os.path.abspath(item_to_process) target_dir = os.path.abspath(target_dir) # Get the user ID (UID) for the specified username @@ -105,83 +209,121 @@ def replace_files_with_symlinks(source_dir, target_dir, username, dry_run=False) "Searching for files owned by '%s' (UID: %s) in '%s'...", username, user_uid, - source_dir, + item_to_process, ) # Use efficient scandir-based search - for file_path in find_owned_files_scandir(source_dir, user_uid): - logger.info("Found owned file: %s", file_path) - - # Determine the relative path and the new link's destination - relative_path = os.path.relpath(file_path, source_dir) - link_target = os.path.join(target_dir, relative_path) - - # Check if the target file actually exists - if not os.path.exists(link_target): - logger.warning( - "Warning: Corresponding file not found in '%s' for '%s'. Skipping.", - target_dir, - file_path, - ) - continue + for file_path in find_owned_files_scandir(item_to_process, user_uid, inputdata_root): + replace_one_file_with_symlink(inputdata_root, target_dir, file_path, dry_run=dry_run) - # Get the link name - link_name = file_path - if dry_run: - logger.info( - "[DRY RUN] Would create symbolic link: %s -> %s", - link_name, - link_target, - ) - continue - - # Remove the original file - try: - os.rename(link_name, link_name + ".tmp") - logger.info("Deleted original file: %s", link_name) - except OSError as e: - logger.error("Error deleting file %s: %s. Skipping.", link_name, e) - continue - - # Create the symbolic link, handling necessary parent directories - try: - # Create parent directories for the link if they don't exist - os.makedirs(os.path.dirname(link_name), exist_ok=True) - os.symlink(link_target, link_name) - os.remove(link_name + ".tmp") - logger.info("Created symbolic link: %s -> %s", link_name, link_target) - except OSError as e: - os.rename(link_name + ".tmp", link_name) - logger.error("Error creating symlink for %s: %s. Skipping.", link_name, e) +def replace_one_file_with_symlink( + inputdata_root, target_dir, file_path, dry_run=False +): + """ + Given a file, replaces it with a symbolic link to the same relative path in a target directory + tree. + Args: + inputdata_root (str): The root of the directory tree containing CESM input data. + target_dir (str): The root of the directory tree containing the new files. + file_path (str): The path of the file to be replaced. + dry_run (bool): If True, only show what would be done without making changes. + """ + logger.info("Found owned file: %s", file_path) + + # Determine the relative path and the new link's destination + relative_path = os.path.relpath(file_path, inputdata_root) + link_target = os.path.join(target_dir, relative_path) + + # Check if the target file actually exists + if not os.path.exists(link_target): + logger.warning( + "Warning: Corresponding file '%s' not found for '%s'. Skipping.", + link_target, + file_path, + ) + return -def validate_directory(path): + # Get the link name + link_name = file_path + + if dry_run: + logger.info( + "[DRY RUN] Would create symbolic link: %s -> %s", + link_name, + link_target, + ) + return + + # Remove the original file + try: + os.rename(link_name, link_name + ".tmp") + logger.info("Deleted original file: %s", link_name) + except OSError as e: + logger.error("Error deleting file %s: %s. Skipping.", link_name, e) + return + + # Create the symbolic link, handling necessary parent directories + try: + # Create parent directories for the link if they don't exist + os.makedirs(os.path.dirname(link_name), exist_ok=True) + os.symlink(link_target, link_name) + os.remove(link_name + ".tmp") + logger.info("Created symbolic link: %s -> %s", link_name, link_target) + except OSError as e: + os.rename(link_name + ".tmp", link_name) + logger.error("Error creating symlink for %s: %s. Skipping.", link_name, e) + + +def validate_paths(path, check_is_dir=False): """ - Validate that the path exists and is a directory. + Validate that one or more paths exist. Args: - path (str): The path to validate. + path (str or list): The path to validate, or a list of such paths. Returns: - str: The absolute path if valid. + str or list: The absolute path(s) if valid. Raises: - argparse.ArgumentTypeError: If path doesn't exist or is not a directory. + argparse.ArgumentTypeError: If a path doesn't exist. """ + if isinstance(path, list): + result = [] + for item in path: + result.append(validate_paths(item, check_is_dir=check_is_dir)) + return result + if not os.path.exists(path): - raise argparse.ArgumentTypeError(f"Directory '{path}' does not exist") - if not os.path.isdir(path): + raise argparse.ArgumentTypeError(f"'{path}' does not exist") + if check_is_dir and not os.path.isdir(path): raise argparse.ArgumentTypeError(f"'{path}' is not a directory") return os.path.abspath(path) +def validate_directory(path): + """ + Validate that one or more directories exist. + + Args: + path (str or list): The directory to validate, or a list of such directories. + + Returns: + str or list: The absolute path(s) if valid. + + Raises: + argparse.ArgumentTypeError: If a path doesn't exist. + """ + return validate_paths(path, check_is_dir=True) + + def parse_arguments(): """ Parse command-line arguments. Returns: - argparse.Namespace: Parsed arguments containing source_root, + argparse.Namespace: Parsed arguments containing items_to_process, target_root, and verbosity settings. """ parser = argparse.ArgumentParser( @@ -190,11 +332,12 @@ def parse_arguments(): ) ) parser.add_argument( - "--source-root", - type=validate_directory, + "items_to_process", + nargs="*", default=DEFAULT_SOURCE_ROOT, + type=validate_paths, help=( - f"The root of the directory tree to search for files (default: {DEFAULT_SOURCE_ROOT})" + f"One or more (directories to search for) files (default: {DEFAULT_SOURCE_ROOT})" ), ) parser.add_argument( @@ -207,6 +350,16 @@ def parse_arguments(): ), ) + # The root of the directory tree containing CESM input data. + # ONLY INTENDED FOR USE IN TESTING + parser.add_argument( + "--inputdata-root", + "-inputdata", # to match rimport + type=validate_directory, + default=DEFAULT_SOURCE_ROOT, + help=argparse.SUPPRESS, + ) + # Verbosity options (mutually exclusive) verbosity_group = parser.add_mutually_exclusive_group() verbosity_group.add_argument( @@ -254,8 +407,37 @@ def process_args(args): else: args.log_level = logging.INFO + # Ensure that items_to_process is a list + if hasattr(args, "items_to_process") and not isinstance(args.items_to_process, list): + args.items_to_process = [args.items_to_process] + + # Check that everything is an absolute path (should have been converted, if needed, during + # validate_paths). + if hasattr(args, "items_to_process"): + for item in args.items_to_process: + assert os.path.isabs(item) + if hasattr(args, "target_root"): + assert os.path.isabs(args.target_root) + + # Check that every item in items_to_process is a child of inputdata_root + if hasattr(args, "items_to_process"): # Sometimes doesn't if we're testing + for item in args.items_to_process: + if not Path(item).is_relative_to(args.inputdata_root): + raise argparse.ArgumentTypeError( + f"Item '{item}' not under inputdata root '{args.inputdata_root}'" + ) + + # Check that target_root is NOT a child of inputdata_root + if hasattr(args, "target_root"): # Sometimes doesn't if we're testing + if Path(args.target_root).is_relative_to(args.inputdata_root): + raise argparse.ArgumentTypeError( + f"Target root ('{args.target_root}') must not be under inputdata root " + f"'{args.inputdata_root}'" + ) + def main(): + # pylint: disable=missing-function-docstring args = parse_arguments() @@ -266,9 +448,14 @@ def main(): start_time = time.time() # --- Execution --- - replace_files_with_symlinks( - args.source_root, args.target_root, my_username, dry_run=args.dry_run - ) + for item in args.items_to_process: + replace_files_with_symlinks( + item, + args.target_root, + my_username, + inputdata_root=args.inputdata_root, + dry_run=args.dry_run, + ) if args.timing: elapsed_time = time.time() - start_time diff --git a/tests/relink/conftest.py b/tests/relink/conftest.py index 5a7eaa2..08f67da 100644 --- a/tests/relink/conftest.py +++ b/tests/relink/conftest.py @@ -7,6 +7,7 @@ import shutil import pytest +from unittest.mock import patch @pytest.fixture(scope="function", name="temp_dirs") @@ -15,7 +16,9 @@ def fixture_temp_dirs(): source_dir = tempfile.mkdtemp(prefix="test_source_") target_dir = tempfile.mkdtemp(prefix="test_target_") - yield source_dir, target_dir + with patch("relink.DEFAULT_SOURCE_ROOT", source_dir): + with patch("relink.DEFAULT_TARGET_ROOT", target_dir): + yield source_dir, target_dir # Cleanup shutil.rmtree(source_dir, ignore_errors=True) diff --git a/tests/relink/test_args.py b/tests/relink/test_args.py index d9b3136..38f9683 100644 --- a/tests/relink/test_args.py +++ b/tests/relink/test_args.py @@ -4,8 +4,7 @@ import os import sys -import tempfile -import shutil +from pathlib import Path import logging import argparse from unittest.mock import patch @@ -20,112 +19,91 @@ import relink # noqa: E402 -@pytest.fixture(scope="function", name="mock_default_dirs") -def fixture_mock_default_dirs(): - """Mock the default directories to use temporary directories.""" - source_dir = tempfile.mkdtemp(prefix="test_default_source_") - target_dir = tempfile.mkdtemp(prefix="test_default_target_") - - with patch.object(relink, "DEFAULT_SOURCE_ROOT", source_dir): - with patch.object(relink, "DEFAULT_TARGET_ROOT", target_dir): - yield source_dir, target_dir - - # Cleanup - shutil.rmtree(source_dir, ignore_errors=True) - shutil.rmtree(target_dir, ignore_errors=True) - - class TestParseArguments: """Test suite for parse_arguments function.""" - def test_default_arguments(self, mock_default_dirs): + def test_default_arguments(self, temp_dirs): """Test that default arguments are used when none provided.""" - source_dir, target_dir = mock_default_dirs - with patch("sys.argv", ["relink.py"]): + source_dir, target_dir = temp_dirs + with patch("sys.argv", ["relink.py", source_dir]): args = relink.parse_arguments() - assert args.source_root == source_dir + assert args.items_to_process == [source_dir] assert args.target_root == target_dir - def test_custom_source_root(self, mock_default_dirs, tmp_path): + def test_custom_source_root(self, temp_dirs): """Test custom source root argument.""" - _, target_dir = mock_default_dirs - custom_source = tmp_path / "custom_source" + source_dir, target_dir = temp_dirs + custom_source = Path(os.path.join(source_dir, "custom_source")) custom_source.mkdir() - with patch("sys.argv", ["relink.py", "--source-root", str(custom_source)]): + with patch("sys.argv", ["relink.py", str(custom_source)]): args = relink.parse_arguments() - assert args.source_root == str(custom_source.resolve()) + assert args.items_to_process == [str(custom_source.resolve())] assert args.target_root == target_dir - def test_custom_target_root(self, mock_default_dirs, tmp_path): + def test_custom_target_root(self, temp_dirs): """Test custom target root argument.""" - source_dir, _ = mock_default_dirs - custom_target = tmp_path / "custom_target" + source_dir, target_dir = temp_dirs + custom_target = Path(os.path.join(target_dir, "custom_target")) custom_target.mkdir() with patch("sys.argv", ["relink.py", "--target-root", str(custom_target)]): args = relink.parse_arguments() - assert args.source_root == source_dir + assert args.items_to_process == [source_dir] assert args.target_root == str(custom_target.resolve()) - def test_both_custom_paths(self, tmp_path): + def test_both_custom_paths(self, temp_dirs): """Test both custom source and target roots.""" - source_path = tmp_path / "custom_source" - target_path = tmp_path / "custom_target" - source_path.mkdir() - target_path.mkdir() + source_root, target_root = temp_dirs + source_dir = Path(os.path.join(source_root, "custom_source")) + target_dir = Path(os.path.join(target_root, "custom_target")) + source_dir.mkdir() + target_dir.mkdir() with patch( "sys.argv", [ "relink.py", - "--source-root", - str(source_path), + str(source_dir), "--target-root", - str(target_path), + str(target_dir), ], ): args = relink.parse_arguments() - assert args.source_root == str(source_path.resolve()) - assert args.target_root == str(target_path.resolve()) + assert args.items_to_process == [str(source_dir.resolve())] + assert args.target_root == str(target_dir.resolve()) - def test_verbose_flag(self, mock_default_dirs): # pylint: disable=unused-argument + def test_verbose_flag(self, temp_dirs): # pylint: disable=unused-argument """Test that --verbose flag is parsed correctly.""" with patch("sys.argv", ["relink.py", "--verbose"]): args = relink.parse_arguments() assert args.verbose is True assert args.quiet is False - def test_quiet_flag(self, mock_default_dirs): # pylint: disable=unused-argument + def test_quiet_flag(self, temp_dirs): # pylint: disable=unused-argument """Test that --quiet flag is parsed correctly.""" with patch("sys.argv", ["relink.py", "--quiet"]): args = relink.parse_arguments() assert args.quiet is True assert args.verbose is False - def test_verbose_short_flag( - self, mock_default_dirs - ): # pylint: disable=unused-argument + def test_verbose_short_flag(self, temp_dirs): # pylint: disable=unused-argument """Test that -v flag is parsed correctly.""" with patch("sys.argv", ["relink.py", "-v"]): args = relink.parse_arguments() assert args.verbose is True - def test_quiet_short_flag( - self, mock_default_dirs - ): # pylint: disable=unused-argument + def test_quiet_short_flag(self, temp_dirs): # pylint: disable=unused-argument """Test that -q flag is parsed correctly.""" with patch("sys.argv", ["relink.py", "-q"]): args = relink.parse_arguments() assert args.quiet is True - def test_default_verbosity( - self, mock_default_dirs - ): # pylint: disable=unused-argument + def test_default_verbosity(self, temp_dirs): # pylint: disable=unused-argument """Test that default verbosity has both flags as False.""" with patch("sys.argv", ["relink.py"]): args = relink.parse_arguments() assert args.verbose is False assert args.quiet is False - def test_verbose_and_quiet_mutually_exclusive(self, mock_default_dirs): + def test_verbose_and_quiet_mutually_exclusive(self, temp_dirs): """Test that --verbose and --quiet cannot be used together.""" # pylint: disable=unused-argument with patch("sys.argv", ["relink.py", "--verbose", "--quiet"]): @@ -134,7 +112,7 @@ def test_verbose_and_quiet_mutually_exclusive(self, mock_default_dirs): # Mutually exclusive arguments cause SystemExit with code 2 assert exc_info.value.code == 2 - def test_verbose_and_quiet_short_flags_mutually_exclusive(self, mock_default_dirs): + def test_verbose_and_quiet_short_flags_mutually_exclusive(self, temp_dirs): """Test that -v and -q cannot be used together.""" # pylint: disable=unused-argument with patch("sys.argv", ["relink.py", "-v", "-q"]): @@ -143,34 +121,78 @@ def test_verbose_and_quiet_short_flags_mutually_exclusive(self, mock_default_dir # Mutually exclusive arguments cause SystemExit with code 2 assert exc_info.value.code == 2 - def test_dry_run_flag(self, mock_default_dirs): + def test_dry_run_flag(self, temp_dirs): """Test that --dry-run flag is parsed correctly.""" # pylint: disable=unused-argument with patch("sys.argv", ["relink.py", "--dry-run"]): args = relink.parse_arguments() assert args.dry_run is True - def test_dry_run_default(self, mock_default_dirs): + def test_dry_run_default(self, temp_dirs): """Test that dry_run defaults to False.""" # pylint: disable=unused-argument with patch("sys.argv", ["relink.py"]): args = relink.parse_arguments() assert args.dry_run is False - def test_timing_flag(self, mock_default_dirs): + def test_timing_flag(self, temp_dirs): """Test that --timing flag is parsed correctly.""" # pylint: disable=unused-argument with patch("sys.argv", ["relink.py", "--timing"]): args = relink.parse_arguments() assert args.timing is True - def test_timing_default(self, mock_default_dirs): + def test_timing_default(self, temp_dirs): """Test that timing defaults to False.""" # pylint: disable=unused-argument with patch("sys.argv", ["relink.py"]): args = relink.parse_arguments() assert args.timing is False + def test_multiple_source_roots(self, temp_dirs): + """Test that multiple source root arguments are parsed correctly.""" + inputdata_root, target_dir = temp_dirs + source1 = Path(os.path.join(inputdata_root, "source1")) + source2 = Path(os.path.join(inputdata_root, "source2")) + source3 = Path(os.path.join(inputdata_root, "source3")) + source1.mkdir() + source2.mkdir() + source3.mkdir() + + with patch("sys.argv", ["relink.py", str(source1), str(source2), str(source3)]): + args = relink.parse_arguments() + assert len(args.items_to_process) == 3 + assert str(source1.resolve()) in args.items_to_process + assert str(source2.resolve()) in args.items_to_process + assert str(source3.resolve()) in args.items_to_process + assert args.target_root == target_dir + + def test_multiple_source_roots_with_target(self, temp_dirs): + """Test multiple source roots with custom target root.""" + inputdata_root, target_dir = temp_dirs + source1 = Path(os.path.join(inputdata_root, "source1")) + source2 = Path(os.path.join(inputdata_root, "source2")) + target = Path(os.path.join(target_dir, "target")) + source1.mkdir() + source2.mkdir() + target.mkdir() + + with patch( + "sys.argv", + [ + "relink.py", + str(source1), + str(source2), + "--target-root", + str(target), + ], + ): + args = relink.parse_arguments() + assert len(args.items_to_process) == 2 + assert str(source1.resolve()) in args.items_to_process + assert str(source2.resolve()) in args.items_to_process + assert args.target_root == str(target.resolve()) + class TestValidateDirectory: """Test suite for validate_directory function.""" @@ -193,15 +215,74 @@ def test_nonexistent_directory(self): assert "does not exist" in str(exc_info.value) assert nonexistent in str(exc_info.value) + def test_relative_path_converted_to_absolute(self, tmp_path): + """Test that relative paths are converted to absolute.""" + test_dir = tmp_path / "relative_test" + test_dir.mkdir() + + # Change to parent directory and use relative path + cwd = os.getcwd() + try: + os.chdir(str(tmp_path)) + result = relink.validate_directory("relative_test") + assert os.path.isabs(result) + assert result == str(test_dir.resolve()) + finally: + os.chdir(cwd) + + def test_symlink_to_directory(self, tmp_path): + """Test that symlink to a directory is accepted.""" + real_dir = tmp_path / "real_dir" + real_dir.mkdir() + + link_dir = tmp_path / "link_dir" + link_dir.symlink_to(real_dir) + + result = relink.validate_paths(str(link_dir)) + # validate_directory returns absolute path of the symlink itself + assert result == str(link_dir.absolute()) + # Verify it's still a symlink + assert os.path.islink(result) + + def test_list_with_invalid_directory(self, tmp_path): + """Test that a list with one invalid directory raises error.""" + dir1 = tmp_path / "dir1" + dir1.mkdir() + nonexistent = tmp_path / "nonexistent" + + with pytest.raises(argparse.ArgumentTypeError) as exc_info: + relink.validate_paths([str(dir1), str(nonexistent)]) + + assert "does not exist" in str(exc_info.value) + + +class TestValidatePaths: + """Test suite for validate_paths function.""" + + def test_valid_directory(self, tmp_path): + """Test that valid directory is accepted and returns absolute path.""" + test_dir = tmp_path / "valid_dir" + test_dir.mkdir() + + result = relink.validate_paths(str(test_dir)) + assert result == str(test_dir.resolve()) + + def test_nonexistent_directory(self): + """Test that nonexistent directory raises ArgumentTypeError.""" + nonexistent = os.path.join(os.sep, "nonexistent", "directory", "12345") + + with pytest.raises(argparse.ArgumentTypeError) as exc_info: + relink.validate_paths(nonexistent) + + assert "does not exist" in str(exc_info.value) + assert nonexistent in str(exc_info.value) + def test_file_instead_of_directory(self, tmp_path): - """Test that a file path raises ArgumentTypeError.""" + """Test that a file path doesn't raise ArgumentTypeError (or any error).""" test_file = tmp_path / "test_file.txt" test_file.write_text("content") - with pytest.raises(argparse.ArgumentTypeError) as exc_info: - relink.validate_directory(str(test_file)) - - assert "not a directory" in str(exc_info.value) + relink.validate_paths(str(test_file)) def test_relative_path_converted_to_absolute(self, tmp_path): """Test that relative paths are converted to absolute.""" @@ -212,7 +293,7 @@ def test_relative_path_converted_to_absolute(self, tmp_path): cwd = os.getcwd() try: os.chdir(str(tmp_path)) - result = relink.validate_directory("relative_test") + result = relink.validate_paths("relative_test") assert os.path.isabs(result) assert result == str(test_dir.resolve()) finally: @@ -226,12 +307,32 @@ def test_symlink_to_directory(self, tmp_path): link_dir = tmp_path / "link_dir" link_dir.symlink_to(real_dir) - result = relink.validate_directory(str(link_dir)) + result = relink.validate_paths(str(link_dir)) # validate_directory returns absolute path of the symlink itself assert result == str(link_dir.absolute()) # Verify it's still a symlink assert os.path.islink(result) + def test_list_with_invalid_directory(self, tmp_path): + """Test that a list with one invalid directory raises error.""" + dir1 = tmp_path / "dir1" + dir1.mkdir() + nonexistent = tmp_path / "nonexistent" + + with pytest.raises(argparse.ArgumentTypeError) as exc_info: + relink.validate_paths([str(dir1), str(nonexistent)]) + + assert "does not exist" in str(exc_info.value) + + def test_list_with_file_instead_of_directory(self, tmp_path): + """Test that a list containing a file doesn't raise error.""" + dir1 = tmp_path / "dir1" + dir1.mkdir() + file1 = tmp_path / "file.txt" + file1.write_text("content") + + relink.validate_paths([str(dir1), str(file1)]) + class TestProcessArgs: """Test suite for process_args function.""" @@ -264,3 +365,29 @@ def test_process_args_modifies_args_in_place(self): # Should be the same object, modified in place assert args is original_args assert hasattr(args, "log_level") + + def test_error_if_source_not_in_inputdata(self): + """Test that process_args errors if source isn't in inputdata_root.""" + args = argparse.Namespace( + quiet=False, + verbose=False, + items_to_process=os.path.abspath("abc123"), + inputdata_root=os.path.abspath("def456"), + ) + with pytest.raises(argparse.ArgumentTypeError) as exc_info: + relink.process_args(args) + assert "not under inputdata root" in str(exc_info.value) + + def test_error_if_target_in_inputdata(self): + """Test that process_args errors if target is in inputdata_root.""" + inputdata_root = os.path.abspath("inputdata") + target_root = os.path.join(inputdata_root, "abc123") + args = argparse.Namespace( + quiet=False, + verbose=False, + target_root=target_root, + inputdata_root=inputdata_root, + ) + with pytest.raises(argparse.ArgumentTypeError) as exc_info: + relink.process_args(args) + assert "must not be under inputdata root" in str(exc_info.value) diff --git a/tests/relink/test_cmdline.py b/tests/relink/test_cmdline.py index 2274515..44cd27c 100644 --- a/tests/relink/test_cmdline.py +++ b/tests/relink/test_cmdline.py @@ -5,6 +5,7 @@ import os import sys import subprocess +from pathlib import Path import pytest @@ -40,11 +41,12 @@ def test_command_line_execution_dry_run(mock_dirs): command = [ sys.executable, relink_script, - "--source-root", str(source_dir), "--target-root", str(target_dir), "--dry-run", + "--inputdata-root", + str(source_dir), ] # Execute the command @@ -62,8 +64,8 @@ def test_command_line_execution_dry_run(mock_dirs): assert not source_file.is_symlink() -def test_command_line_execution_actual_run(mock_dirs): - """Test executing relink.py from command line without dry-run.""" +def test_command_line_execution_given_dir(mock_dirs): + """Test executing relink.py from command line given a directory.""" source_dir, target_dir, source_file, target_file = mock_dirs # Get the path to relink.py @@ -76,10 +78,11 @@ def test_command_line_execution_actual_run(mock_dirs): command = [ sys.executable, relink_script, - "--source-root", str(source_dir), "--target-root", str(target_dir), + "-inputdata", + str(source_dir), ] # Execute the command @@ -94,3 +97,148 @@ def test_command_line_execution_actual_run(mock_dirs): # Verify success messages in output assert "Created symbolic link:" in result.stdout + + +def test_command_line_execution_given_file(mock_dirs): + """Test executing relink.py from command line given a file.""" + source_dir, target_dir, source_file, target_file = mock_dirs + + # Get the path to relink.py + relink_script = os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), + "relink.py", + ) + + # Build the command + command = [ + sys.executable, + relink_script, + str(source_file), + "--target-root", + str(target_dir), + "-inputdata", + str(source_dir), + ] + + # Execute the command + result = subprocess.run(command, capture_output=True, text=True, check=False) + + # Verify the command executed successfully + assert result.returncode == 0, f"Command failed with stderr: {result.stderr}" + + # Verify the file was converted to a symlink + assert source_file.is_symlink() + assert os.readlink(str(source_file)) == str(target_file) + + # Verify success messages in output + assert "Created symbolic link:" in result.stdout + + +def test_command_line_multiple_source_dirs(temp_dirs): + """Test executing relink.py with multiple source directories.""" + inputdata_dir, target_dir = temp_dirs + # Create multiple source directories + source1 = Path(os.path.join(inputdata_dir, "source1")) + source2 = Path(os.path.join(inputdata_dir, "source2")) + target1 = Path(os.path.join(target_dir, "source1")) + target2 = Path(os.path.join(target_dir, "source2")) + source1.mkdir() + source2.mkdir() + target1.mkdir() + target2.mkdir() + + # Create files in each source directory + source1_file = source1 / "file1.txt" + source2_file = source2 / "file2.txt" + target1_file = target1 / "file1.txt" + target2_file = target2 / "file2.txt" + + source1_file.write_text("source1 content") + source2_file.write_text("source2 content") + target1_file.write_text("target1 content") + target2_file.write_text("target2 content") + + # Get the path to relink.py + relink_script = os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), + "relink.py", + ) + + # Build the command with multiple source directories + command = [ + sys.executable, + relink_script, + str(source1), + str(source2), + "--target-root", + target_dir, + "--inputdata-root", + str(inputdata_dir), + ] + + # Execute the command + result = subprocess.run(command, capture_output=True, text=True, check=False) + + # Verify the command executed successfully + assert result.returncode == 0, f"Command failed with stderr: {result.stderr}" + + # Verify both files were converted to symlinks + assert source1_file.is_symlink() + assert source2_file.is_symlink() + assert os.readlink(str(source1_file)) == str(target1_file) + assert os.readlink(str(source2_file)) == str(target2_file) + + +def test_command_line_source_dir_and_file(temp_dirs): + """Test executing relink.py with a source directory and source file.""" + inputdata_dir, target_dir = temp_dirs + # Create multiple source directories + source1 = Path(os.path.join(inputdata_dir, "source1")) + source2 = Path(os.path.join(inputdata_dir, "source2")) + target1 = Path(os.path.join(target_dir, "source1")) + target2 = Path(os.path.join(target_dir, "source2")) + source1.mkdir() + source2.mkdir() + target1.mkdir() + target2.mkdir() + + # Create files in each source directory + source1_file = source1 / "file1.txt" + source2_file = source2 / "file2.txt" + target1_file = target1 / "file1.txt" + target2_file = target2 / "file2.txt" + + source1_file.write_text("source1 content") + source2_file.write_text("source2 content") + target1_file.write_text("target1 content") + target2_file.write_text("target2 content") + + # Get the path to relink.py + relink_script = os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), + "relink.py", + ) + + # Build the command + command = [ + sys.executable, + relink_script, + str(source1), + source2_file, + "--target-root", + target_dir, + "--inputdata-root", + str(inputdata_dir), + ] + + # Execute the command + result = subprocess.run(command, capture_output=True, text=True, check=False) + + # Verify the command executed successfully + assert result.returncode == 0, f"Command failed with stderr: {result.stderr}" + + # Verify both files were converted to symlinks + assert source1_file.is_symlink() + assert source2_file.is_symlink() + assert os.readlink(str(source1_file)) == str(target1_file) + assert os.readlink(str(source2_file)) == str(target2_file) diff --git a/tests/relink/test_dryrun.py b/tests/relink/test_dryrun.py index b18c676..ee638c4 100644 --- a/tests/relink/test_dryrun.py +++ b/tests/relink/test_dryrun.py @@ -5,6 +5,7 @@ import os import sys import logging +from unittest.mock import patch import pytest @@ -46,7 +47,7 @@ def test_dry_run_no_changes(dry_run_setup, caplog): # Run in dry-run mode with caplog.at_level(logging.INFO): relink.replace_files_with_symlinks( - source_dir, target_dir, username, dry_run=True + source_dir, target_dir, username, inputdata_root=source_dir, dry_run=True ) # Verify no changes were made @@ -64,7 +65,7 @@ def test_dry_run_shows_message(dry_run_setup, caplog): # Run in dry-run mode with caplog.at_level(logging.INFO): relink.replace_files_with_symlinks( - source_dir, target_dir, username, dry_run=True + source_dir, target_dir, username, inputdata_root=source_dir, dry_run=True ) # Check that dry-run messages were logged @@ -80,7 +81,7 @@ def test_dry_run_no_delete_or_create_messages(dry_run_setup, caplog): # Run in dry-run mode with caplog.at_level(logging.INFO): relink.replace_files_with_symlinks( - source_dir, target_dir, username, dry_run=True + source_dir, target_dir, username, inputdata_root=source_dir, dry_run=True ) # Verify actual operation messages are NOT logged diff --git a/tests/relink/test_find_owned_files_scandir.py b/tests/relink/test_find_owned_files_scandir.py index d0718a3..0ce9d35 100644 --- a/tests/relink/test_find_owned_files_scandir.py +++ b/tests/relink/test_find_owned_files_scandir.py @@ -6,6 +6,10 @@ import sys import tempfile import logging +from unittest.mock import patch +from contextlib import contextmanager + +import pytest # Add parent directory to path to import relink module sys.path.insert( @@ -15,8 +19,79 @@ import relink # noqa: E402 -def test_find_owned_files_basic(temp_dirs): - """Test basic functionality: find files owned by user.""" +class MockDirEntry: + """Wrapper for DirEntry that allows mocking stat() for specific files.""" + + # pylint: disable=missing-function-docstring + + def __init__(self, entry, uid_override=None): + """ + Initialize MockDirEntry. + + Args: + entry: The original DirEntry object. + uid_override: Dict mapping filename to UID to override in stat results. + """ + self._entry = entry + self._uid_override = uid_override or {} + + def __getattr__(self, name): + return getattr(self._entry, name) + + def stat(self, *args, **kwargs): + stat_result = self._entry.stat(*args, **kwargs) + if self._entry.name in self._uid_override: + # Create a modified stat result with different UID + modified_stat = os.stat_result( + ( + stat_result.st_mode, + stat_result.st_ino, + stat_result.st_dev, + stat_result.st_nlink, + self._uid_override[self._entry.name], # Override UID + stat_result.st_gid, + stat_result.st_size, + stat_result.st_atime, + stat_result.st_mtime, + stat_result.st_ctime, + ) + ) + return modified_stat + return stat_result + + def is_file(self, *args, **kwargs): + return self._entry.is_file(*args, **kwargs) + + def is_dir(self, *args, **kwargs): + return self._entry.is_dir(*args, **kwargs) + + def is_symlink(self): + return self._entry.is_symlink() + + +def create_mock_scandir(uid_override=None): + """ + Create a mock scandir function that wraps entries with MockDirEntry. + + Args: + uid_override: Dict mapping filename to UID to override in stat results. + + Returns: + A context manager function that can be used with patch. + """ + original_scandir = os.scandir + + @contextmanager + def mock_scandir(path): + with original_scandir(path) as entries: + wrapped_entries = [MockDirEntry(entry, uid_override) for entry in entries] + yield iter(wrapped_entries) + + return mock_scandir + + +def test_find_owned_files_basic_indir(temp_dirs): + """Test basic functionality: find files owned by user in a directory.""" source_dir, _ = temp_dirs user_uid = os.stat(source_dir).st_uid @@ -30,7 +105,9 @@ def test_find_owned_files_basic(temp_dirs): f.write("content2") # Find owned files - found_files = list(relink.find_owned_files_scandir(source_dir, user_uid)) + found_files = list( + relink.find_owned_files_scandir(source_dir, user_uid, inputdata_root=source_dir) + ) # Verify both files were found assert len(found_files) == 2 @@ -38,6 +115,34 @@ def test_find_owned_files_basic(temp_dirs): assert file2 in found_files +def test_find_owned_files_basic_asfiles(temp_dirs): + """Test basic functionality: find files owned by user given their paths directly.""" + source_dir, _ = temp_dirs + user_uid = os.stat(source_dir).st_uid + + # Create files + file1 = os.path.join(source_dir, "file1.txt") + file2 = os.path.join(source_dir, "file2.txt") + file_list = [file1, file2] + + for file in file_list: + with open(file, "w", encoding="utf-8") as f: + f.write("content") + + # Find owned files + found_files = [] + for file in file_list: + found_files += list( + relink.find_owned_files_scandir(file, user_uid, inputdata_root=source_dir) + ) + + # Verify both files were found + assert len(found_files) == 2 + print(f"{found_files=}") + assert file1 in found_files + assert file2 in found_files + + def test_find_owned_files_nested(temp_dirs): """Test finding files in nested directory structures.""" source_dir, _ = temp_dirs @@ -57,7 +162,9 @@ def test_find_owned_files_nested(temp_dirs): fp.write("content") # Find owned files - found_files = list(relink.find_owned_files_scandir(source_dir, user_uid)) + found_files = list( + relink.find_owned_files_scandir(source_dir, user_uid, inputdata_root=source_dir) + ) # Verify all files were found assert len(found_files) == 3 @@ -82,8 +189,12 @@ def test_skip_symlinks(temp_dirs, caplog): os.symlink(dummy_target, symlink_path) # Find owned files with logging - with caplog.at_level(logging.INFO): - found_files = list(relink.find_owned_files_scandir(source_dir, user_uid)) + with caplog.at_level(logging.DEBUG): + found_files = list( + relink.find_owned_files_scandir( + source_dir, user_uid, inputdata_root=source_dir + ) + ) # Verify only regular file was found assert len(found_files) == 1 @@ -95,13 +206,61 @@ def test_skip_symlinks(temp_dirs, caplog): assert symlink_path in caplog.text +def test_skip_symlinks_owned_by_different_user(temp_dirs, caplog): + """Test that symlinks owned by different users are not logged. + + Since find_owned_files_scandir filters by UID first, symlinks owned + by other users should never reach the symlink check and thus should + not generate a "Skipping symlink" log message. + """ + source_dir, _ = temp_dirs + user_uid = os.stat(source_dir).st_uid + + # Use a different UID + different_uid = user_uid + 1000 + + # Create a regular file owned by current user + regular_file = os.path.join(source_dir, "regular.txt") + with open(regular_file, "w", encoding="utf-8") as f: + f.write("content") + + # Create a symlink + symlink_path = os.path.join(source_dir, "other_user_link.txt") + dummy_target = os.path.join(tempfile.gettempdir(), "somewhere") + os.symlink(dummy_target, symlink_path) + + # Mock DirEntry.stat to return different UID for the symlink + uid_override = {"other_user_link.txt": different_uid} + mock_scandir = create_mock_scandir(uid_override) + + with patch("os.scandir", side_effect=mock_scandir): + with caplog.at_level(logging.INFO): + found_files = list( + relink.find_owned_files_scandir( + source_dir, user_uid, inputdata_root=source_dir + ) + ) + + # Verify only regular file was found + assert len(found_files) == 1 + assert regular_file in found_files + assert symlink_path not in found_files + + # Check that "Skipping symlink" message was NOT logged for the other user's symlink + # (it should be filtered out by UID check before reaching symlink check) + if "Skipping symlink:" in caplog.text: + assert symlink_path not in caplog.text + + def test_empty_directory(temp_dirs): """Test with empty directory.""" source_dir, _ = temp_dirs user_uid = os.stat(source_dir).st_uid # Find owned files in empty directory - found_files = list(relink.find_owned_files_scandir(source_dir, user_uid)) + found_files = list( + relink.find_owned_files_scandir(source_dir, user_uid, inputdata_root=source_dir) + ) # Should return empty list assert len(found_files) == 0 @@ -130,7 +289,11 @@ def test_permission_error_handling(temp_dirs, caplog): try: # Find owned files with debug logging with caplog.at_level(logging.DEBUG): - found_files = list(relink.find_owned_files_scandir(source_dir, user_uid)) + found_files = list( + relink.find_owned_files_scandir( + source_dir, user_uid, inputdata_root=source_dir + ) + ) # Should find the accessible file but skip the inaccessible directory assert file1 in found_files @@ -157,7 +320,9 @@ def test_only_files_not_directories(temp_dirs): os.makedirs(subdir) # Find owned files - found_files = list(relink.find_owned_files_scandir(source_dir, user_uid)) + found_files = list( + relink.find_owned_files_scandir(source_dir, user_uid, inputdata_root=source_dir) + ) # Should only find the file, not the directory assert len(found_files) == 1 @@ -188,7 +353,11 @@ def test_does_not_follow_symlink_directories(temp_dirs): os.symlink(external_dir, symlink_dir) # Find owned files - found_files = list(relink.find_owned_files_scandir(source_dir, user_uid)) + found_files = list( + relink.find_owned_files_scandir( + source_dir, user_uid, inputdata_root=source_dir + ) + ) # Should find file in real directory but not in symlinked directory assert file_in_real in found_files diff --git a/tests/relink/test_handle_non_dir.py b/tests/relink/test_handle_non_dir.py new file mode 100644 index 0000000..172e2f4 --- /dev/null +++ b/tests/relink/test_handle_non_dir.py @@ -0,0 +1,429 @@ +""" +Tests of handle_non_dir() and _handle_non_dir_entry() in relink.py +""" + +# pylint: disable=protected-access + +import os +import sys +import tempfile +import logging +from unittest.mock import Mock, patch + +import pytest + +# Add parent directory to path to import relink module +sys.path.insert( + 0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +) +# pylint: disable=wrong-import-position +import relink # noqa: E402 + + +@pytest.fixture(name="mock_direntry") +def fixture_mock_direntry(): + """ + Factory fixture to create mock DirEntry objects. + + Returns: + callable: A function that creates a mock DirEntry with specified properties. + """ + + def _create_mock(name, path, uid, is_file=True, is_symlink=False): + """ + Create a mock DirEntry object. + + Args: + name (str): The name of the file/directory. + path (str): The full path to the file/directory. + uid (int): The UID of the owner. + is_file (bool): Whether this is a file. + is_symlink (bool): Whether this is a symlink. + + Returns: + Mock: A mock DirEntry object. + """ + mock_entry = Mock(spec=os.DirEntry) + mock_entry.name = name + mock_entry.path = path + + mock_stat = Mock() + mock_stat.st_uid = uid + mock_entry.stat.return_value = mock_stat + + mock_entry.is_file.return_value = is_file + mock_entry.is_symlink.return_value = is_symlink + + return mock_entry + + return _create_mock + + +@pytest.fixture(name="mock_stat_with_different_uid") +def fixture_mock_stat_with_different_uid(): + """ + Factory fixture to create a mock os.stat function that returns different UID for specific files. + + Returns: + callable: A function that creates a mock stat function. + """ + + def _create_mock_stat(file_path, different_uid): + """ + Create a mock stat function that returns different UID for a specific file. + + Args: + file_path (str): The path to the file that should have different UID. + different_uid (int): The UID to return for that file. + + Returns: + callable: A function that can be used with patch("os.stat", side_effect=...) + """ + original_stat = os.stat + + def mock_stat(path, *args, **kwargs): + stat_result = original_stat(path, *args, **kwargs) + if path == file_path: + # Create modified stat result with different UID + modified_stat = os.stat_result( + ( + stat_result.st_mode, + stat_result.st_ino, + stat_result.st_dev, + stat_result.st_nlink, + different_uid, # Different UID + stat_result.st_gid, + stat_result.st_size, + stat_result.st_atime, + stat_result.st_mtime, + stat_result.st_ctime, + ) + ) + return modified_stat + return stat_result + + return mock_stat + + return _create_mock_stat + + +class TestHandleNonDirEntry: + """ + Tests for _handle_non_dir_entry() function. + + Logging tests are in test_verbosity.py. + """ + + def test_returns_path_for_owned_regular_file(self, temp_dirs): + """Test that owned regular files return their path.""" + source_dir, _ = temp_dirs + user_uid = os.stat(source_dir).st_uid + + # Create a regular file + test_file = os.path.join(source_dir, "test.txt") + with open(test_file, "w", encoding="utf-8") as f: + f.write("content") + + # Get DirEntry for the file + with os.scandir(source_dir) as entries: + entry = next(e for e in entries if e.name == "test.txt") + result = relink._handle_non_dir_entry(entry, user_uid) + + assert result == test_file + + def test_returns_none_for_file_owned_by_different_user( + self, temp_dirs, mock_direntry + ): + """Test that files owned by different users return None.""" + source_dir, _ = temp_dirs + user_uid = os.stat(source_dir).st_uid + different_uid = user_uid + 1000 + + # Create a file + test_file = os.path.join(source_dir, "test.txt") + with open(test_file, "w", encoding="utf-8") as f: + f.write("content") + + # Create mock entry with different UID + mock_entry = mock_direntry( + "test.txt", test_file, different_uid, is_file=True, is_symlink=False + ) + + result = relink._handle_non_dir_entry(mock_entry, user_uid) + + assert result is None + + def test_returns_none_and_logs_for_owned_symlink(self, temp_dirs, caplog): + """Test that owned symlinks return None and are logged.""" + source_dir, _ = temp_dirs + user_uid = os.stat(source_dir).st_uid + + # Create a symlink + symlink_path = os.path.join(source_dir, "link.txt") + dummy_target = os.path.join(tempfile.gettempdir(), "somewhere") + os.symlink(dummy_target, symlink_path) + + # Get DirEntry for the symlink + with os.scandir(source_dir) as entries: + entry = next(e for e in entries if e.name == "link.txt") + + with caplog.at_level(logging.DEBUG): + result = relink._handle_non_dir_entry(entry, user_uid) + + assert result is None + assert "Skipping symlink:" in caplog.text + assert symlink_path in caplog.text + + def test_returns_none_for_symlink_owned_by_different_user( + self, temp_dirs, caplog, mock_direntry + ): + """Test that symlinks owned by different users return None without logging.""" + source_dir, _ = temp_dirs + user_uid = os.stat(source_dir).st_uid + different_uid = user_uid + 1000 + + # Create a symlink + symlink_path = os.path.join(source_dir, "link.txt") + dummy_target = os.path.join(tempfile.gettempdir(), "somewhere") + os.symlink(dummy_target, symlink_path) + + # Create mock entry with different UID + mock_entry = mock_direntry( + "link.txt", symlink_path, different_uid, is_file=False, is_symlink=True + ) + + with caplog.at_level(logging.DEBUG): + result = relink._handle_non_dir_entry(mock_entry, user_uid) + + assert result is None + # Should NOT log because it's not owned by the user + assert "Skipping symlink:" not in caplog.text + + def test_handles_file_with_spaces(self, temp_dirs): + """Test that files with spaces in names are handled correctly.""" + source_dir, _ = temp_dirs + user_uid = os.stat(source_dir).st_uid + + # Create a file with spaces + test_file = os.path.join(source_dir, "file with spaces.txt") + with open(test_file, "w", encoding="utf-8") as f: + f.write("content") + + # Get DirEntry for the file + with os.scandir(source_dir) as entries: + entry = next(e for e in entries if e.name == "file with spaces.txt") + result = relink._handle_non_dir_entry(entry, user_uid) + + assert result == test_file + + def test_handles_file_with_special_characters(self, temp_dirs): + """Test that files with special characters are handled correctly.""" + source_dir, _ = temp_dirs + user_uid = os.stat(source_dir).st_uid + + # Create a file with special characters + filename = "file-with_special.chars@123.txt" + test_file = os.path.join(source_dir, filename) + with open(test_file, "w", encoding="utf-8") as f: + f.write("content") + + # Get DirEntry for the file + with os.scandir(source_dir) as entries: + entry = next(e for e in entries if e.name == filename) + result = relink._handle_non_dir_entry(entry, user_uid) + + assert result == test_file + + +class TestHandleNonDirStr: + """ + Tests for _handle_non_dir_str() function. + + TODO: Logging tests are in test_verbosity.py. + """ + + def test_returns_path_for_owned_regular_file(self, temp_dirs): + """Test that owned regular files return their path.""" + source_dir, _ = temp_dirs + user_uid = os.stat(source_dir).st_uid + + # Create a regular file + test_file = os.path.join(source_dir, "test.txt") + with open(test_file, "w", encoding="utf-8") as f: + f.write("content") + + # Get path of the file + result = relink._handle_non_dir_str(test_file, user_uid) + + assert result == test_file + + def test_returns_none_for_file_owned_by_different_user( + self, temp_dirs, mock_stat_with_different_uid + ): + """Test that files owned by different users return None.""" + source_dir, _ = temp_dirs + user_uid = os.stat(source_dir).st_uid + different_uid = user_uid + 1000 + + # Create a file + test_file = os.path.join(source_dir, "test.txt") + with open(test_file, "w", encoding="utf-8") as f: + f.write("content") + + # Create mock stat function + mock_stat = mock_stat_with_different_uid(test_file, different_uid) + + with patch("os.stat", side_effect=mock_stat): + result = relink._handle_non_dir_str(test_file, user_uid) + + assert result is None + + def test_returns_none_and_logs_for_owned_symlink(self, temp_dirs, caplog): + """Test that owned symlinks return None and are logged.""" + source_dir, _ = temp_dirs + user_uid = os.stat(source_dir).st_uid + + # Create a symlink + symlink_path = os.path.join(source_dir, "link.txt") + dummy_target = os.path.join(tempfile.gettempdir(), "somewhere") + os.symlink(dummy_target, symlink_path) + + # Get path of the symlink + with caplog.at_level(logging.DEBUG): + result = relink._handle_non_dir_str(symlink_path, user_uid) + + assert result is None + assert "Skipping symlink:" in caplog.text + assert symlink_path in caplog.text + + def test_returns_none_for_symlink_owned_by_different_user( + self, temp_dirs, caplog, mock_stat_with_different_uid + ): + """Test that symlinks owned by different users return None without logging.""" + source_dir, _ = temp_dirs + user_uid = os.stat(source_dir).st_uid + different_uid = user_uid + 1000 + + # Create a symlink + symlink_path = os.path.join(source_dir, "link.txt") + dummy_target = os.path.join(tempfile.gettempdir(), "somewhere") + os.symlink(dummy_target, symlink_path) + + # Create mock stat function + mock_stat = mock_stat_with_different_uid(symlink_path, different_uid) + + with patch("os.stat", side_effect=mock_stat): + with caplog.at_level(logging.DEBUG): + result = relink._handle_non_dir_str(symlink_path, user_uid) + + assert result is None + # Should NOT log because it's not owned by the user + assert "Skipping symlink:" not in caplog.text + + def test_handles_file_with_spaces(self, temp_dirs): + """Test that files with spaces in names are handled correctly.""" + source_dir, _ = temp_dirs + user_uid = os.stat(source_dir).st_uid + + # Create a file with spaces + test_file = os.path.join(source_dir, "file with spaces.txt") + with open(test_file, "w", encoding="utf-8") as f: + f.write("content") + + # Get path of the file + result = relink._handle_non_dir_str(test_file, user_uid) + + assert result == test_file + + def test_handles_file_with_special_characters(self, temp_dirs): + """Test that files with special characters are handled correctly.""" + source_dir, _ = temp_dirs + user_uid = os.stat(source_dir).st_uid + + # Create a file with special characters + filename = "file-with_special.chars@123.txt" + test_file = os.path.join(source_dir, filename) + with open(test_file, "w", encoding="utf-8") as f: + f.write("content") + + # Get path of the file + result = relink._handle_non_dir_str(test_file, user_uid) + + assert result == test_file + + def test_error_file_doesnt_exist(self, temp_dirs): + """Test that error is thrown if file doesn't exist.""" + source_dir, _ = temp_dirs + user_uid = os.stat(source_dir).st_uid + + # Create a file name that doesn't exist + filename = "filename.txt" + test_file = os.path.join(source_dir, filename) + assert not os.path.exists(test_file) + + # Get path of the file + with pytest.raises(FileNotFoundError): + relink._handle_non_dir_str(test_file, user_uid) + + +class TestHandleNonDir: + """Tests for handle_non_dir() function.""" + + def test_works_with_direntry(self, temp_dirs): + """Test that handle_non_dir works with os.DirEntry objects.""" + source_dir, _ = temp_dirs + user_uid = os.stat(source_dir).st_uid + + # Create a regular file + test_file = os.path.join(source_dir, "test.txt") + with open(test_file, "w", encoding="utf-8") as f: + f.write("content") + + # Get DirEntry for the file + with os.scandir(source_dir) as entries: + entry = next(e for e in entries if e.name == "test.txt") + result = relink.handle_non_dir(entry, user_uid) + + assert result == test_file + + def test_works_with_str(self, temp_dirs): + """Test that handle_non_dir works with strings.""" + source_dir, _ = temp_dirs + user_uid = os.stat(source_dir).st_uid + + # Create a regular file + test_file = os.path.join(source_dir, "test.txt") + with open(test_file, "w", encoding="utf-8") as f: + f.write("content") + + # Get path of the file + result = relink.handle_non_dir(test_file, user_uid) + + assert result == test_file + + def test_errors_with_str_file_doesnt_exist(self, temp_dirs): + """Test that handle_non_dir throws error if given string pointing to nonexistent file.""" + source_dir, _ = temp_dirs + user_uid = os.stat(source_dir).st_uid + + # Create a file name that doesn't exist + test_file = "filename.txt" + assert not os.path.exists(test_file) + + # Get path of the file + with pytest.raises(FileNotFoundError): + relink.handle_non_dir(test_file, user_uid) + + def test_raises_typeerror_for_int(self, temp_dirs): + """Test that handle_non_dir raises TypeError for an integer.""" + source_dir, _ = temp_dirs + user_uid = os.stat(source_dir).st_uid + + invalid_input = 12345 + expected_type = type(invalid_input) + + with pytest.raises( + TypeError, + match=f"Unsure how to handle non-directory variable of type.*{expected_type}", + ): + relink.handle_non_dir(invalid_input, user_uid) diff --git a/tests/relink/test_replace_files_with_symlinks.py b/tests/relink/test_replace_files_with_symlinks.py index 3d670fe..6fa2820 100644 --- a/tests/relink/test_replace_files_with_symlinks.py +++ b/tests/relink/test_replace_files_with_symlinks.py @@ -1,5 +1,7 @@ """ -Tests of replace_files_with_symlinks() in relink.py +Tests of replace_files_with_symlinks() in relink.py. Note that this module is focused on testing +just the logic of this function. The actual replacement and other stuff that happens in +replace_one_file_with_symlink() is tested in test_replace_one_file_with_symlink. """ import os @@ -7,7 +9,8 @@ import tempfile import pwd import logging -from unittest.mock import patch +from unittest.mock import patch, call +import pytest # Add parent directory to path to import relink module sys.path.insert( @@ -17,13 +20,20 @@ import relink # noqa: E402 -def test_basic_file_replacement(temp_dirs, current_user): - """Test basic functionality: replace owned file with symlink.""" - source_dir, target_dir = temp_dirs +@pytest.fixture(name="mock_replace_one") +def fixture_mock_replace_one(): + """Fixture that mocks relink.replace_one_file_with_symlink""" + with patch("relink.replace_one_file_with_symlink") as mock: + yield mock + + +def test_basic_file_replacement_given_dir(temp_dirs, current_user, mock_replace_one): + """Test basic functionality: given directory, replace owned file with symlink.""" + inputdata_root, target_dir = temp_dirs username = current_user # Create a file in source directory - source_file = os.path.join(source_dir, "test_file.txt") + source_file = os.path.join(inputdata_root, "test_file.txt") with open(source_file, "w", encoding="utf-8") as f: f.write("source content") @@ -33,27 +43,93 @@ def test_basic_file_replacement(temp_dirs, current_user): f.write("target content") # Run the function - relink.replace_files_with_symlinks(source_dir, target_dir, username) + relink.replace_files_with_symlinks( + inputdata_root, target_dir, username, inputdata_root=inputdata_root + ) + + # Verify replace_one_file_with_symlink() was called correctly + mock_replace_one.assert_called_once_with( + inputdata_root, + target_dir, + source_file, + dry_run=False, + ) + + +def test_basic_file_replacement_given_file(temp_dirs, current_user, mock_replace_one): + """Test basic functionality: given owned file, replace with symlink.""" + inputdata_root, target_dir = temp_dirs + username = current_user - # Verify the source file is now a symlink - assert os.path.islink(source_file), "Source file should be a symlink" - assert ( - os.readlink(source_file) == target_file - ), "Symlink should point to target file" + # Create a file in source directory + source_file = os.path.join(inputdata_root, "test_file.txt") + with open(source_file, "w", encoding="utf-8") as f: + f.write("source content") + # Create corresponding file in target directory + target_file = os.path.join(target_dir, "test_file.txt") + with open(target_file, "w", encoding="utf-8") as f: + f.write("target content") -def test_nested_directory_structure(temp_dirs, current_user): + # Run the function + relink.replace_files_with_symlinks( + source_file, target_dir, username, inputdata_root=inputdata_root + ) + + # Verify replace_one_file_with_symlink() was called correctly + mock_replace_one.assert_called_once_with( + inputdata_root, + target_dir, + source_file, + dry_run=False, + ) + + +def test_dry_run(temp_dirs, current_user, mock_replace_one): + """Test that dry_run=True is passed correctly.""" + inputdata_root, target_dir = temp_dirs + username = current_user + + # Create a file in source directory + source_file = os.path.join(inputdata_root, "test_file.txt") + with open(source_file, "w", encoding="utf-8") as f: + f.write("source content") + + # Create corresponding file in target directory + target_file = os.path.join(target_dir, "test_file.txt") + with open(target_file, "w", encoding="utf-8") as f: + f.write("target content") + + # Run the function + relink.replace_files_with_symlinks( + inputdata_root, + target_dir, + username, + inputdata_root=inputdata_root, + dry_run=True, + ) + + # Verify replace_one_file_with_symlink() was called correctly + mock_replace_one.assert_called_once_with( + inputdata_root, + target_dir, + source_file, + dry_run=True, + ) + + +def test_nested_directory_structure(temp_dirs, current_user, mock_replace_one): """Test with nested directory structures.""" - source_dir, target_dir = temp_dirs + inputdata_root, target_dir = temp_dirs username = current_user # Create nested directories nested_path = os.path.join("subdir1", "subdir2") - os.makedirs(os.path.join(source_dir, nested_path)) + os.makedirs(os.path.join(inputdata_root, nested_path)) os.makedirs(os.path.join(target_dir, nested_path)) # Create files in nested directories - source_file = os.path.join(source_dir, nested_path, "nested_file.txt") + source_file = os.path.join(inputdata_root, nested_path, "nested_file.txt") target_file = os.path.join(target_dir, nested_path, "nested_file.txt") with open(source_file, "w", encoding="utf-8") as f: @@ -62,16 +138,22 @@ def test_nested_directory_structure(temp_dirs, current_user): f.write("nested target") # Run the function - relink.replace_files_with_symlinks(source_dir, target_dir, username) + relink.replace_files_with_symlinks( + inputdata_root, target_dir, username, inputdata_root=inputdata_root + ) - # Verify - assert os.path.islink(source_file), "Nested file should be a symlink" - assert os.readlink(source_file) == target_file + # Verify replace_one_file_with_symlink() was called correctly + mock_replace_one.assert_called_once_with( + inputdata_root, + target_dir, + source_file, + dry_run=False, + ) -def test_skip_existing_symlinks(temp_dirs, current_user, caplog): +def test_skip_existing_symlinks(temp_dirs, current_user, caplog, mock_replace_one): """Test that existing symlinks are skipped.""" - source_dir, target_dir = temp_dirs + inputdata_root, target_dir = temp_dirs username = current_user # Create a target file @@ -80,59 +162,48 @@ def test_skip_existing_symlinks(temp_dirs, current_user, caplog): f.write("target") # Create a symlink in source (pointing somewhere else) - source_link = os.path.join(source_dir, "existing_link.txt") + source_link = os.path.join(inputdata_root, "existing_link.txt") dummy_target = os.path.join(tempfile.gettempdir(), "somewhere") os.symlink(dummy_target, source_link) - # Get the inode and mtime before running the function - stat_before = os.lstat(source_link) - # Run the function - with caplog.at_level(logging.INFO): - relink.replace_files_with_symlinks(source_dir, target_dir, username) - - # Verify the symlink is unchanged (same inode means it wasn't deleted/recreated) - stat_after = os.lstat(source_link) - assert ( - stat_before.st_ino == stat_after.st_ino - ), "Symlink should not have been recreated" - assert ( - stat_before.st_mtime == stat_after.st_mtime - ), "Symlink mtime should be unchanged" - assert ( - os.readlink(source_link) == dummy_target - ), "Symlink target should be unchanged" - - # Check that "Skipping symlink" message was logged - assert "Skipping symlink:" in caplog.text - assert source_link in caplog.text - - -def test_missing_target_file(temp_dirs, current_user, caplog): + with caplog.at_level(logging.DEBUG): + relink.replace_files_with_symlinks( + inputdata_root, target_dir, username, inputdata_root=inputdata_root + ) + + # Verify replace_one_file_with_symlink() wasn't called + mock_replace_one.assert_not_called() + + +def test_missing_target_file(temp_dirs, current_user, caplog, mock_replace_one): """Test behavior when target file doesn't exist.""" - source_dir, target_dir = temp_dirs + inputdata_root, target_dir = temp_dirs username = current_user # Create only source file (no corresponding target) - source_file = os.path.join(source_dir, "orphan.txt") + source_file = os.path.join(inputdata_root, "orphan.txt") with open(source_file, "w", encoding="utf-8") as f: f.write("orphan content") # Run the function with caplog.at_level(logging.INFO): - relink.replace_files_with_symlinks(source_dir, target_dir, username) - - # Verify the file is NOT converted to symlink - assert not os.path.islink(source_file), "File should not be a symlink" - assert os.path.isfile(source_file), "Original file should still exist" + relink.replace_files_with_symlinks( + inputdata_root, target_dir, username, inputdata_root=inputdata_root + ) - # Check warning message - assert "Warning: Corresponding file not found" in caplog.text + # Verify replace_one_file_with_symlink() was called correctly + mock_replace_one.assert_called_once_with( + inputdata_root, + target_dir, + source_file, + dry_run=False, + ) -def test_invalid_username(temp_dirs, caplog): +def test_invalid_username(temp_dirs, caplog, mock_replace_one): """Test behavior with invalid username.""" - source_dir, target_dir = temp_dirs + inputdata_root, target_dir = temp_dirs # Use a username that doesn't exist invalid_username = "nonexistent_user_12345" @@ -145,21 +216,22 @@ def test_invalid_username(temp_dirs, caplog): # Run the function with caplog.at_level(logging.INFO): - relink.replace_files_with_symlinks(source_dir, target_dir, invalid_username) + relink.replace_files_with_symlinks( + inputdata_root, target_dir, invalid_username, inputdata_root=inputdata_root + ) - # Check error message - assert "Error: User" in caplog.text - assert "not found" in caplog.text + # Verify replace_one_file_with_symlink() wasn't called + mock_replace_one.assert_not_called() -def test_multiple_files(temp_dirs, current_user): +def test_multiple_files(temp_dirs, current_user, mock_replace_one): """Test with multiple files in the directory.""" - source_dir, target_dir = temp_dirs + inputdata_root, target_dir = temp_dirs username = current_user # Create multiple files for i in range(5): - source_file = os.path.join(source_dir, f"file_{i}.txt") + source_file = os.path.join(inputdata_root, f"file_{i}.txt") target_file = os.path.join(target_dir, f"file_{i}.txt") with open(source_file, "w", encoding="utf-8") as f: @@ -168,23 +240,70 @@ def test_multiple_files(temp_dirs, current_user): f.write(f"target {i}") # Run the function - relink.replace_files_with_symlinks(source_dir, target_dir, username) + relink.replace_files_with_symlinks( + inputdata_root, target_dir, username, inputdata_root=inputdata_root + ) - # Verify all files are symlinks + # Verify replace_one_file_with_symlink() was called correctly + calls = [] for i in range(5): - source_file = os.path.join(source_dir, f"file_{i}.txt") - target_file = os.path.join(target_dir, f"file_{i}.txt") - assert os.path.islink(source_file) - assert os.readlink(source_file) == target_file + source_file = os.path.join(inputdata_root, f"file_{i}.txt") + calls.append(call(inputdata_root, target_dir, source_file, dry_run=False)) + mock_replace_one.assert_has_calls(calls, any_order=True) + + +def test_multiple_files_nested(temp_dirs, current_user, mock_replace_one): + """Test with multiple files scattered throughout a nested directory tree.""" + inputdata_root, target_dir = temp_dirs + username = current_user + # Create nested directory structure with files at different levels + test_files = [ + "root_file1.txt", + "root_file2.txt", + os.path.join("level1", "file_a.txt"), + os.path.join("level1", "file_b.txt"), + os.path.join("level1", "subdir", "file_c.txt"), + os.path.join("level2", "deep", "nested", "file_d.txt"), + os.path.join("level2", "file_e.txt"), + ] + + # Create all files and their parent directories + source_files = [] + for rel_path in test_files: + source_file = os.path.join(inputdata_root, rel_path) + source_files.append(source_file) + target_file = os.path.join(target_dir, rel_path) + + # Create parent directories + os.makedirs(os.path.dirname(source_file), exist_ok=True) + os.makedirs(os.path.dirname(target_file), exist_ok=True) + + # Create files + with open(source_file, "w", encoding="utf-8") as f: + f.write(f"source content for {rel_path}") + with open(target_file, "w", encoding="utf-8") as f: + f.write(f"target content for {rel_path}") -def test_absolute_paths(temp_dirs, current_user): + # Run the function + relink.replace_files_with_symlinks( + inputdata_root, target_dir, username, inputdata_root=inputdata_root + ) + + # Verify replace_one_file_with_symlink() was called correctly + calls = [] + for source_file in source_files: + calls.append(call(inputdata_root, target_dir, source_file, dry_run=False)) + mock_replace_one.assert_has_calls(calls, any_order=True) + + +def test_absolute_paths(temp_dirs, current_user, mock_replace_one): """Test that function handles relative paths by converting to absolute.""" - source_dir, target_dir = temp_dirs + inputdata_root, target_dir = temp_dirs username = current_user # Create test files - source_file = os.path.join(source_dir, "test.txt") + source_file = os.path.join(inputdata_root, "test.txt") target_file = os.path.join(target_dir, "test.txt") with open(source_file, "w", encoding="utf-8") as f: @@ -195,127 +314,63 @@ def test_absolute_paths(temp_dirs, current_user): # Use relative paths (if possible) cwd = os.getcwd() try: - os.chdir(os.path.dirname(source_dir)) - rel_source = os.path.basename(source_dir) + os.chdir(os.path.dirname(inputdata_root)) + rel_source = os.path.basename(inputdata_root) rel_target = os.path.basename(target_dir) # Run with relative paths - relink.replace_files_with_symlinks(rel_source, rel_target, username) - - # Verify it still works - assert os.path.islink(source_file) + relink.replace_files_with_symlinks( + rel_source, rel_target, username, inputdata_root=inputdata_root + ) finally: os.chdir(cwd) + # Verify replace_one_file_with_symlink() was called correctly + mock_replace_one.assert_called_once_with( + inputdata_root, + target_dir, + source_file, + dry_run=False, + ) + def test_print_searching_message(temp_dirs, current_user, caplog): """Test that searching message is printed.""" - source_dir, target_dir = temp_dirs + inputdata_root, target_dir = temp_dirs username = current_user # Run the function with caplog.at_level(logging.INFO): - relink.replace_files_with_symlinks(source_dir, target_dir, username) + relink.replace_files_with_symlinks( + inputdata_root, target_dir, username, inputdata_root=inputdata_root + ) # Check that searching message was logged assert f"Searching for files owned by '{username}'" in caplog.text - assert f"in '{os.path.abspath(source_dir)}'" in caplog.text - - -def test_print_found_owned_file(temp_dirs, current_user, caplog): - """Test that 'Found owned file' message is printed.""" - source_dir, target_dir = temp_dirs - username = current_user - - # Create a file owned by current user - source_file = os.path.join(source_dir, "owned_file.txt") - target_file = os.path.join(target_dir, "owned_file.txt") - - with open(source_file, "w", encoding="utf-8") as f: - f.write("content") - with open(target_file, "w", encoding="utf-8") as f: - f.write("target content") - - # Run the function - with caplog.at_level(logging.INFO): - relink.replace_files_with_symlinks(source_dir, target_dir, username) - - # Check that "Found owned file" message was logged - assert "Found owned file:" in caplog.text - assert source_file in caplog.text - - -def test_print_deleted_and_created_messages(temp_dirs, current_user, caplog): - """Test that deleted and created symlink messages are printed.""" - source_dir, target_dir = temp_dirs - username = current_user - - # Create files - source_file = os.path.join(source_dir, "test_file.txt") - target_file = os.path.join(target_dir, "test_file.txt") - - with open(source_file, "w", encoding="utf-8") as f: - f.write("source") - with open(target_file, "w", encoding="utf-8") as f: - f.write("target") - - # Run the function - with caplog.at_level(logging.INFO): - relink.replace_files_with_symlinks(source_dir, target_dir, username) - - # Check messages - assert "Deleted original file:" in caplog.text - assert "Created symbolic link:" in caplog.text - assert f"{source_file} -> {target_file}" in caplog.text - - -def test_error_creating_symlink(temp_dirs, caplog): - """Test error message when symlink creation fails.""" - source_dir, target_dir = temp_dirs - username = os.environ["USER"] - - # Create source file - source_file = os.path.join(source_dir, "test.txt") - target_file = os.path.join(target_dir, "test.txt") - - with open(source_file, "w", encoding="utf-8") as f: - f.write("source") - with open(target_file, "w", encoding="utf-8") as f: - f.write("target") - - # Mock os.symlink to raise an error - def mock_symlink(src, dst): - raise OSError("Simulated symlink error") - - with patch("os.symlink", side_effect=mock_symlink): - # Run the function - with caplog.at_level(logging.INFO): - relink.replace_files_with_symlinks(source_dir, target_dir, username) + assert f"in '{os.path.abspath(inputdata_root)}'" in caplog.text - # Check error message - assert "Error creating symlink" in caplog.text - assert source_file in caplog.text - -def test_empty_directories(temp_dirs): +def test_empty_directories(temp_dirs, mock_replace_one): """Test with empty directories.""" - source_dir, target_dir = temp_dirs + inputdata_root, target_dir = temp_dirs username = os.environ["USER"] # Run with empty directories (should not crash) - relink.replace_files_with_symlinks(source_dir, target_dir, username) + relink.replace_files_with_symlinks( + inputdata_root, target_dir, username, inputdata_root=inputdata_root + ) - # Should complete without errors - assert True + # Verify replace_one_file_with_symlink() wasn't called + mock_replace_one.assert_not_called() -def test_file_with_spaces_in_name(temp_dirs): +def test_file_with_spaces_in_name(temp_dirs, mock_replace_one): """Test files with spaces in their names.""" - source_dir, target_dir = temp_dirs + inputdata_root, target_dir = temp_dirs username = os.environ["USER"] # Create files with spaces - source_file = os.path.join(source_dir, "file with spaces.txt") + source_file = os.path.join(inputdata_root, "file with spaces.txt") target_file = os.path.join(target_dir, "file with spaces.txt") with open(source_file, "w", encoding="utf-8") as f: @@ -324,21 +379,27 @@ def test_file_with_spaces_in_name(temp_dirs): f.write("target content") # Run the function - relink.replace_files_with_symlinks(source_dir, target_dir, username) + relink.replace_files_with_symlinks( + inputdata_root, target_dir, username, inputdata_root=inputdata_root + ) - # Verify - assert os.path.islink(source_file) - assert os.readlink(source_file) == target_file + # Verify replace_one_file_with_symlink() was called correctly + mock_replace_one.assert_called_once_with( + inputdata_root, + target_dir, + source_file, + dry_run=False, + ) -def test_file_with_special_characters(temp_dirs): +def test_file_with_special_characters(temp_dirs, mock_replace_one): """Test files with special characters in names.""" - source_dir, target_dir = temp_dirs + inputdata_root, target_dir = temp_dirs username = os.environ["USER"] # Create files with special chars (that are valid in filenames) filename = "file-with_special.chars@123.txt" - source_file = os.path.join(source_dir, filename) + source_file = os.path.join(inputdata_root, filename) target_file = os.path.join(target_dir, filename) with open(source_file, "w", encoding="utf-8") as f: @@ -347,36 +408,14 @@ def test_file_with_special_characters(temp_dirs): f.write("target content") # Run the function - relink.replace_files_with_symlinks(source_dir, target_dir, username) - - # Verify - assert os.path.islink(source_file) - assert os.readlink(source_file) == target_file - - -def test_error_deleting_file(temp_dirs, caplog): - """Test error message when file deletion fails.""" - source_dir, target_dir = temp_dirs - username = os.environ["USER"] - - # Create files - source_file = os.path.join(source_dir, "test.txt") - target_file = os.path.join(target_dir, "test.txt") - - with open(source_file, "w", encoding="utf-8") as f: - f.write("source") - with open(target_file, "w", encoding="utf-8") as f: - f.write("target") - - # Mock os.rename to raise an error - def mock_rename(src, dst): - raise OSError("Simulated rename error") - - with patch("os.rename", side_effect=mock_rename): - # Run the function - with caplog.at_level(logging.INFO): - relink.replace_files_with_symlinks(source_dir, target_dir, username) - - # Check error message - assert "Error deleting file" in caplog.text - assert source_file in caplog.text + relink.replace_files_with_symlinks( + inputdata_root, target_dir, username, inputdata_root=inputdata_root + ) + + # Verify replace_one_file_with_symlink() was called correctly + mock_replace_one.assert_called_once_with( + inputdata_root, + target_dir, + source_file, + dry_run=False, + ) diff --git a/tests/relink/test_replace_one_file_with_symlink.py b/tests/relink/test_replace_one_file_with_symlink.py new file mode 100644 index 0000000..70caaba --- /dev/null +++ b/tests/relink/test_replace_one_file_with_symlink.py @@ -0,0 +1,258 @@ +""" +Tests of replace_one_file_with_symlink() in relink.py +""" + +import os +import sys +import logging +from unittest.mock import patch + +# Add parent directory to path to import relink module +sys.path.insert( + 0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +) +# pylint: disable=wrong-import-position +import relink # noqa: E402 + + +def test_basic_file_replacement(temp_dirs): + """Test basic functionality: replace owned file with symlink.""" + source_dir, target_dir = temp_dirs + + # Create a file in source directory + source_file = os.path.join(source_dir, "test_file.txt") + with open(source_file, "w", encoding="utf-8") as f: + f.write("source content") + + # Create corresponding file in target directory + target_file = os.path.join(target_dir, "test_file.txt") + with open(target_file, "w", encoding="utf-8") as f: + f.write("target content") + + # Run the function + relink.replace_one_file_with_symlink(source_dir, target_dir, source_file) + + # Verify the source file is now a symlink + assert os.path.islink(source_file), "Source file should be a symlink" + assert ( + os.readlink(source_file) == target_file + ), "Symlink should point to target file" + + +def test_nested_directory_structure(temp_dirs): + """Test with nested directory structures.""" + source_dir, target_dir = temp_dirs + + # Create nested directories + nested_path = os.path.join("subdir1", "subdir2") + os.makedirs(os.path.join(source_dir, nested_path)) + os.makedirs(os.path.join(target_dir, nested_path)) + + # Create files in nested directories + source_file = os.path.join(source_dir, nested_path, "nested_file.txt") + target_file = os.path.join(target_dir, nested_path, "nested_file.txt") + + with open(source_file, "w", encoding="utf-8") as f: + f.write("nested source") + with open(target_file, "w", encoding="utf-8") as f: + f.write("nested target") + + # Run the function + relink.replace_one_file_with_symlink(source_dir, target_dir, source_file) + + # Verify + assert os.path.islink(source_file), "Nested file should be a symlink" + assert os.readlink(source_file) == target_file + + +def test_missing_target_file(temp_dirs, caplog): + """Test behavior when target file doesn't exist.""" + source_dir, target_dir = temp_dirs + + # Create only source file (no corresponding target) + source_file = os.path.join(source_dir, "orphan.txt") + with open(source_file, "w", encoding="utf-8") as f: + f.write("orphan content") + + # Run the function + with caplog.at_level(logging.INFO): + relink.replace_one_file_with_symlink(source_dir, target_dir, source_file) + + # Verify the file is NOT converted to symlink + assert not os.path.islink(source_file), "File should not be a symlink" + assert os.path.isfile(source_file), "Original file should still exist" + + # Check warning message + assert "Warning: Corresponding file " in caplog.text + assert " not found" in caplog.text + + +def test_absolute_paths(temp_dirs): + """Test that function handles relative paths by converting to absolute.""" + source_dir, target_dir = temp_dirs + + # Create test files + source_file = os.path.join(source_dir, "test.txt") + target_file = os.path.join(target_dir, "test.txt") + + with open(source_file, "w", encoding="utf-8") as f: + f.write("test") + with open(target_file, "w", encoding="utf-8") as f: + f.write("test target") + + # Use relative paths (if possible) + cwd = os.getcwd() + try: + os.chdir(os.path.dirname(source_dir)) + rel_source = os.path.basename(source_dir) + rel_target = os.path.basename(target_dir) + + # Run with relative paths + relink.replace_one_file_with_symlink(rel_source, rel_target, source_file) + + # Verify it still works + assert os.path.islink(source_file) + finally: + os.chdir(cwd) + + +def test_print_found_owned_file(temp_dirs, caplog): + """Test that 'Found owned file' message is printed.""" + source_dir, target_dir = temp_dirs + + # Create a file owned by current user + source_file = os.path.join(source_dir, "owned_file.txt") + target_file = os.path.join(target_dir, "owned_file.txt") + + with open(source_file, "w", encoding="utf-8") as f: + f.write("content") + with open(target_file, "w", encoding="utf-8") as f: + f.write("target content") + + # Run the function + with caplog.at_level(logging.INFO): + relink.replace_one_file_with_symlink(source_dir, target_dir, source_file) + + # Check that "Found owned file" message was logged + assert "Found owned file:" in caplog.text + assert source_file in caplog.text + + +def test_print_deleted_and_created_messages(temp_dirs, caplog): + """Test that deleted and created symlink messages are printed.""" + source_dir, target_dir = temp_dirs + + # Create files + source_file = os.path.join(source_dir, "test_file.txt") + target_file = os.path.join(target_dir, "test_file.txt") + + with open(source_file, "w", encoding="utf-8") as f: + f.write("source") + with open(target_file, "w", encoding="utf-8") as f: + f.write("target") + + # Run the function + with caplog.at_level(logging.INFO): + relink.replace_one_file_with_symlink(source_dir, target_dir, source_file) + + # Check messages + assert "Deleted original file:" in caplog.text + assert "Created symbolic link:" in caplog.text + assert f"{source_file} -> {target_file}" in caplog.text + + +def test_error_creating_symlink(temp_dirs, caplog): + """Test error message when symlink creation fails.""" + source_dir, target_dir = temp_dirs + + # Create source file + source_file = os.path.join(source_dir, "test.txt") + target_file = os.path.join(target_dir, "test.txt") + + with open(source_file, "w", encoding="utf-8") as f: + f.write("source") + with open(target_file, "w", encoding="utf-8") as f: + f.write("target") + + # Mock os.symlink to raise an error + def mock_symlink(src, dst): + raise OSError("Simulated symlink error") + + with patch("os.symlink", side_effect=mock_symlink): + # Run the function + with caplog.at_level(logging.INFO): + relink.replace_one_file_with_symlink(source_dir, target_dir, source_file) + + # Check error message + assert "Error creating symlink" in caplog.text + assert source_file in caplog.text + + +def test_file_with_spaces_in_name(temp_dirs): + """Test files with spaces in their names.""" + source_dir, target_dir = temp_dirs + + # Create files with spaces + source_file = os.path.join(source_dir, "file with spaces.txt") + target_file = os.path.join(target_dir, "file with spaces.txt") + + with open(source_file, "w", encoding="utf-8") as f: + f.write("content") + with open(target_file, "w", encoding="utf-8") as f: + f.write("target content") + + # Run the function + relink.replace_one_file_with_symlink(source_dir, target_dir, source_file) + + # Verify + assert os.path.islink(source_file) + assert os.readlink(source_file) == target_file + + +def test_file_with_special_characters(temp_dirs): + """Test files with special characters in names.""" + source_dir, target_dir = temp_dirs + + # Create files with special chars (that are valid in filenames) + filename = "file-with_special.chars@123.txt" + source_file = os.path.join(source_dir, filename) + target_file = os.path.join(target_dir, filename) + + with open(source_file, "w", encoding="utf-8") as f: + f.write("content") + with open(target_file, "w", encoding="utf-8") as f: + f.write("target content") + + # Run the function + relink.replace_one_file_with_symlink(source_dir, target_dir, source_file) + + # Verify + assert os.path.islink(source_file) + assert os.readlink(source_file) == target_file + + +def test_error_deleting_file(temp_dirs, caplog): + """Test error message when file deletion fails.""" + source_dir, target_dir = temp_dirs + + # Create files + source_file = os.path.join(source_dir, "test.txt") + target_file = os.path.join(target_dir, "test.txt") + + with open(source_file, "w", encoding="utf-8") as f: + f.write("source") + with open(target_file, "w", encoding="utf-8") as f: + f.write("target") + + # Mock os.rename to raise an error + def mock_rename(src, dst): + raise OSError("Simulated rename error") + + with patch("os.rename", side_effect=mock_rename): + # Run the function + with caplog.at_level(logging.INFO): + relink.replace_one_file_with_symlink(source_dir, target_dir, source_file) + + # Check error message + assert "Error deleting file" in caplog.text + assert source_file in caplog.text diff --git a/tests/relink/test_timing.py b/tests/relink/test_timing.py index e071944..e7e9cff 100644 --- a/tests/relink/test_timing.py +++ b/tests/relink/test_timing.py @@ -37,10 +37,11 @@ def test_timing_logging(tmp_path, caplog, use_timing, should_log_timing): # Build argv with or without --timing flag test_argv = [ "relink.py", - "--source-root", str(source_dir), "--target-root", str(target_dir), + "--inputdata-root", + str(source_dir), ] if use_timing: test_argv.append("--timing") @@ -75,12 +76,13 @@ def test_timing_shows_in_quiet_mode(tmp_path, caplog): # Build argv with both --timing and --quiet flags test_argv = [ "relink.py", - "--source-root", str(source_dir), "--target-root", str(target_dir), "--timing", "--quiet", + "--inputdata-root", + str(source_dir), ] with patch("sys.argv", test_argv): diff --git a/tests/relink/test_verbosity.py b/tests/relink/test_verbosity.py index 073f7ca..0234b7d 100644 --- a/tests/relink/test_verbosity.py +++ b/tests/relink/test_verbosity.py @@ -37,7 +37,9 @@ def test_quiet_mode_suppresses_info_messages(temp_dirs, caplog): # Run the function with WARNING level (quiet mode) with caplog.at_level(logging.WARNING): - relink.replace_files_with_symlinks(source_dir, target_dir, username) + relink.replace_files_with_symlinks( + source_dir, target_dir, username, inputdata_root=source_dir + ) # Verify INFO messages are NOT in the log assert "Searching for files owned by" not in caplog.text @@ -59,10 +61,13 @@ def test_quiet_mode_shows_warnings(temp_dirs, caplog): # Run the function with WARNING level (quiet mode) with caplog.at_level(logging.WARNING): - relink.replace_files_with_symlinks(source_dir, target_dir, username) + relink.replace_files_with_symlinks( + source_dir, target_dir, username, inputdata_root=source_dir + ) # Verify WARNING message IS in the log - assert "Warning: Corresponding file not found" in caplog.text + assert "Warning: Corresponding file" in caplog.text + assert "not found" in caplog.text def test_quiet_mode_shows_errors(temp_dirs, caplog): @@ -73,7 +78,9 @@ def test_quiet_mode_shows_errors(temp_dirs, caplog): # Test 1: Invalid username error invalid_username = "nonexistent_user_12345" with caplog.at_level(logging.WARNING): - relink.replace_files_with_symlinks(source_dir, target_dir, invalid_username) + relink.replace_files_with_symlinks( + source_dir, target_dir, invalid_username, inputdata_root=source_dir + ) assert "Error: User" in caplog.text assert "not found" in caplog.text @@ -94,7 +101,9 @@ def mock_rename(src, dst): with patch("os.rename", side_effect=mock_rename): with caplog.at_level(logging.WARNING): - relink.replace_files_with_symlinks(source_dir, target_dir, username) + relink.replace_files_with_symlinks( + source_dir, target_dir, username, inputdata_root=source_dir + ) assert "Error deleting file" in caplog.text # Clear the log for next test @@ -114,5 +123,7 @@ def mock_symlink(src, dst): with patch("os.symlink", side_effect=mock_symlink): with caplog.at_level(logging.WARNING): - relink.replace_files_with_symlinks(source_dir, target_dir, username) + relink.replace_files_with_symlinks( + source_dir, target_dir, username, inputdata_root=source_dir + ) assert "Error creating symlink" in caplog.text