diff --git a/relink.py b/relink.py index 1cc1e96..67bcc8b 100644 --- a/relink.py +++ b/relink.py @@ -9,14 +9,18 @@ import pwd import argparse import logging +import time -DEFAULT_SOURCE_ROOT = '/glade/campaign/cesm/cesmdata/cseg/inputdata/' -DEFAULT_TARGET_ROOT = '/glade/campaign/collections/gdex/data/d651077/cesmdata/inputdata/' +DEFAULT_SOURCE_ROOT = "/glade/campaign/cesm/cesmdata/cseg/inputdata/" +DEFAULT_TARGET_ROOT = ( + "/glade/campaign/collections/gdex/data/d651077/cesmdata/inputdata/" +) # Set up logger logger = logging.getLogger(__name__) -def find_and_replace_owned_files(source_dir, target_dir, username): + +def find_and_replace_owned_files(source_dir, target_dir, username, dry_run=False): """ Finds files owned by a specific user in a source directory tree, deletes them, and replaces them with symbolic links to the same @@ -26,6 +30,7 @@ def find_and_replace_owned_files(source_dir, target_dir, username): source_dir (str): The root of the directory tree to search for files. target_dir (str): The root of the directory tree containing the new files. username (str): The name of the user whose files will be processed. + dry_run (bool): If True, only show what would be done without making changes. """ source_dir = os.path.abspath(source_dir) target_dir = os.path.abspath(target_dir) @@ -37,11 +42,14 @@ def find_and_replace_owned_files(source_dir, target_dir, username): logger.error("Error: User '%s' not found. Exiting.", username) return + if dry_run: + logger.info("DRY RUN MODE - No changes will be made") + logger.info( "Searching for files owned by '%s' (UID: %s) in '%s'...", username, user_uid, - source_dir + source_dir, ) for dirpath, _, filenames in os.walk(source_dir): @@ -56,7 +64,7 @@ def find_and_replace_owned_files(source_dir, target_dir, username): file_uid = os.stat(file_path).st_uid except FileNotFoundError: - continue # Skip if file was deleted during traversal + continue # Skip if file was deleted during traversal if file_uid == user_uid: logger.info("Found owned file: %s", file_path) @@ -71,16 +79,24 @@ def find_and_replace_owned_files(source_dir, target_dir, username): "Warning: Corresponding file not found in '%s' " "for '%s'. Skipping.", target_dir, - file_path + file_path, ) continue # Get the link name link_name = file_path + if dry_run: + logger.info( + "[DRY RUN] Would create symbolic link: %s -> %s", + link_name, + link_target, + ) + continue + # Remove the original file try: - os.rename(link_name, link_name+".tmp") + os.rename(link_name, link_name + ".tmp") logger.info("Deleted original file: %s", link_name) except OSError as e: logger.error("Error deleting file %s: %s. Skipping.", link_name, e) @@ -91,11 +107,36 @@ def find_and_replace_owned_files(source_dir, target_dir, username): # Create parent directories for the link if they don't exist os.makedirs(os.path.dirname(link_name), exist_ok=True) os.symlink(link_target, link_name) - os.remove(link_name+".tmp") - logger.info("Created symbolic link: %s -> %s", link_name, link_target) + os.remove(link_name + ".tmp") + logger.info( + "Created symbolic link: %s -> %s", link_name, link_target + ) except OSError as e: - os.rename(link_name+".tmp", link_name) - logger.error("Error creating symlink for %s: %s. Skipping.", link_name, e) + os.rename(link_name + ".tmp", link_name) + logger.error( + "Error creating symlink for %s: %s. Skipping.", link_name, e + ) + + +def validate_directory(path): + """ + Validate that the path exists and is a directory. + + Args: + path (str): The path to validate. + + Returns: + str: The absolute path if valid. + + Raises: + argparse.ArgumentTypeError: If path doesn't exist or is not a directory. + """ + if not os.path.exists(path): + raise argparse.ArgumentTypeError(f"Directory '{path}' does not exist") + if not os.path.isdir(path): + raise argparse.ArgumentTypeError(f"'{path}' is not a directory") + return os.path.abspath(path) + def parse_arguments(): """ @@ -107,59 +148,94 @@ def parse_arguments(): """ parser = argparse.ArgumentParser( description=( - 'Find files owned by a user and replace them with symbolic links to a target directory.' + "Find files owned by a user and replace them with symbolic links to a target directory." ) ) parser.add_argument( - '--source-root', + "--source-root", + type=validate_directory, default=DEFAULT_SOURCE_ROOT, help=( - f'The root of the directory tree to search for files (default: {DEFAULT_SOURCE_ROOT})' - ) + f"The root of the directory tree to search for files (default: {DEFAULT_SOURCE_ROOT})" + ), ) parser.add_argument( - '--target-root', + "--target-root", + type=validate_directory, default=DEFAULT_TARGET_ROOT, help=( - f'The root of the directory tree where files should be moved to ' - f'(default: {DEFAULT_TARGET_ROOT})' - ) + f"The root of the directory tree where files should be moved to " + f"(default: {DEFAULT_TARGET_ROOT})" + ), ) # Verbosity options (mutually exclusive) verbosity_group = parser.add_mutually_exclusive_group() verbosity_group.add_argument( - '-v', '--verbose', - action='store_true', - help='Enable verbose output' + "-v", "--verbose", action="store_true", help="Enable verbose output" ) verbosity_group.add_argument( - '-q', '--quiet', - action='store_true', - help='Quiet mode (show only warnings and errors)' + "-q", + "--quiet", + action="store_true", + help="Quiet mode (show only warnings and errors)", + ) + + parser.add_argument( + "--dry-run", + action="store_true", + help="Show what would be done without making any changes", + ) + parser.add_argument( + "--timing", + action="store_true", + help="Measure and display the execution time", ) - return parser.parse_args() + args = parser.parse_args() -if __name__ == '__main__': + process_args(args) + + return args - args = parse_arguments() +def process_args(args): + """ + Process parsed arguments and set derived attributes. + + Sets the log_level attribute on args based on verbosity flags. + + Args: + args (argparse.Namespace): Parsed command-line arguments. + """ # Configure logging based on verbosity flags if args.quiet: - LOG_LEVEL = logging.WARNING + args.log_level = logging.WARNING elif args.verbose: - LOG_LEVEL = logging.DEBUG + args.log_level = logging.DEBUG else: - LOG_LEVEL = logging.INFO + args.log_level = logging.INFO - logging.basicConfig( - level=LOG_LEVEL, - format='%(message)s', - stream=sys.stdout - ) - - my_username = os.environ['USER'] + +def main(): + + args = parse_arguments() + + logging.basicConfig(level=args.log_level, format="%(message)s", stream=sys.stdout) + + my_username = os.environ["USER"] + + start_time = time.time() # --- Execution --- - find_and_replace_owned_files(args.source_root, args.target_root, my_username) + find_and_replace_owned_files( + args.source_root, args.target_root, my_username, dry_run=args.dry_run + ) + + if args.timing: + elapsed_time = time.time() - start_time + logger.info("Execution time: %.2f seconds", elapsed_time) + + +if __name__ == "__main__": + main() diff --git a/tests/test_relink.py b/tests/test_relink.py index 6a78253..1c4e5bf 100644 --- a/tests/test_relink.py +++ b/tests/test_relink.py @@ -8,12 +8,15 @@ import shutil import pwd import logging +import argparse +import subprocess from unittest.mock import patch import pytest # Add parent directory to path to import relink module sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +# pylint: disable=wrong-import-position import relink # noqa: E402 @@ -32,20 +35,36 @@ def configure_logging(): logging.getLogger().handlers.clear() -class TestFindAndReplaceOwnedFiles: - """Test suite for find_and_replace_owned_files function.""" +@pytest.fixture(scope="function", name="mock_default_dirs") +def fixture_mock_default_dirs(): + """Mock the default directories to use temporary directories.""" + source_dir = tempfile.mkdtemp(prefix="test_default_source_") + target_dir = tempfile.mkdtemp(prefix="test_default_target_") + + with patch.object(relink, "DEFAULT_SOURCE_ROOT", source_dir): + with patch.object(relink, "DEFAULT_TARGET_ROOT", target_dir): + yield source_dir, target_dir + + # Cleanup + shutil.rmtree(source_dir, ignore_errors=True) + shutil.rmtree(target_dir, ignore_errors=True) - @pytest.fixture - def temp_dirs(self): - """Create temporary source and target directories for testing.""" - source_dir = tempfile.mkdtemp(prefix="test_source_") - target_dir = tempfile.mkdtemp(prefix="test_target_") - yield source_dir, target_dir +@pytest.fixture(scope="function", name="temp_dirs") +def fixture_temp_dirs(): + """Create temporary source and target directories for testing.""" + source_dir = tempfile.mkdtemp(prefix="test_source_") + target_dir = tempfile.mkdtemp(prefix="test_target_") - # Cleanup - shutil.rmtree(source_dir, ignore_errors=True) - shutil.rmtree(target_dir, ignore_errors=True) + yield source_dir, target_dir + + # Cleanup + shutil.rmtree(source_dir, ignore_errors=True) + shutil.rmtree(target_dir, ignore_errors=True) + + +class TestFindAndReplaceOwnedFiles: + """Test suite for find_and_replace_owned_files function.""" @pytest.fixture def current_user(self): @@ -297,109 +316,203 @@ def test_print_deleted_and_created_messages(self, temp_dirs, current_user, caplo assert "Created symbolic link:" in caplog.text assert f"{source_file} -> {target_file}" in caplog.text + def test_handles_file_deleted_during_traversal( + self, temp_dirs, current_user, caplog + ): + """Test that FileNotFoundError during stat is handled gracefully.""" + source_dir, target_dir = temp_dirs + username = current_user + + # Create files + source_file = os.path.join(source_dir, "disappearing.txt") + with open(source_file, "w", encoding="utf-8") as f: + f.write("content") + + # Mock os.stat to raise FileNotFoundError for this specific file + original_stat = os.stat + + def mock_stat(path, *args, **kwargs): + if path == source_file: + raise FileNotFoundError(f"Simulated: {path} deleted during traversal") + return original_stat(path, *args, **kwargs) + + with patch("os.stat", side_effect=mock_stat): + with caplog.at_level(logging.INFO): + # Should not crash, should continue processing + relink.find_and_replace_owned_files(source_dir, target_dir, username) + + # Should complete without errors (file was skipped) + # No error message should be logged (it's silently skipped via continue) + assert "Error" not in caplog.text + assert "disappearing.txt" not in caplog.text + + def test_error_creating_symlink(self, temp_dirs, caplog): + """Test error message when symlink creation fails.""" + source_dir, target_dir = temp_dirs + username = os.environ["USER"] + + # Create source file + source_file = os.path.join(source_dir, "test.txt") + target_file = os.path.join(target_dir, "test.txt") + + with open(source_file, "w", encoding="utf-8") as f: + f.write("source") + with open(target_file, "w", encoding="utf-8") as f: + f.write("target") + + # Mock os.symlink to raise an error + def mock_symlink(src, dst): + raise OSError("Simulated symlink error") + + with patch("os.symlink", side_effect=mock_symlink): + # Run the function + with caplog.at_level(logging.INFO): + relink.find_and_replace_owned_files(source_dir, target_dir, username) + + # Check error message + assert "Error creating symlink" in caplog.text + assert source_file in caplog.text + class TestParseArguments: """Test suite for parse_arguments function.""" - def test_default_arguments(self): + def test_default_arguments(self, mock_default_dirs): """Test that default arguments are used when none provided.""" + source_dir, target_dir = mock_default_dirs with patch("sys.argv", ["relink.py"]): args = relink.parse_arguments() - assert args.source_root == relink.DEFAULT_SOURCE_ROOT - assert args.target_root == relink.DEFAULT_TARGET_ROOT + assert args.source_root == source_dir + assert args.target_root == target_dir - def test_custom_source_root(self): + def test_custom_source_root(self, mock_default_dirs, tmp_path): """Test custom source root argument.""" - test_path = os.path.join(os.sep, "custom", "source", "path") - with patch("sys.argv", ["relink.py", "--source-root", test_path]): + _, target_dir = mock_default_dirs + custom_source = tmp_path / "custom_source" + custom_source.mkdir() + with patch("sys.argv", ["relink.py", "--source-root", str(custom_source)]): args = relink.parse_arguments() - assert args.source_root == test_path - assert args.target_root == relink.DEFAULT_TARGET_ROOT + assert args.source_root == str(custom_source.resolve()) + assert args.target_root == target_dir - def test_custom_target_root(self): + def test_custom_target_root(self, mock_default_dirs, tmp_path): """Test custom target root argument.""" - test_path = os.path.join(os.sep, "custom", "target", "path") - with patch("sys.argv", ["relink.py", "--target-root", test_path]): + source_dir, _ = mock_default_dirs + custom_target = tmp_path / "custom_target" + custom_target.mkdir() + with patch("sys.argv", ["relink.py", "--target-root", str(custom_target)]): args = relink.parse_arguments() - assert args.source_root == relink.DEFAULT_SOURCE_ROOT - assert args.target_root == test_path + assert args.source_root == source_dir + assert args.target_root == str(custom_target.resolve()) - def test_both_custom_paths(self): + def test_both_custom_paths(self, tmp_path): """Test both custom source and target roots.""" - source_path = os.path.join(os.sep, "custom", "source") - target_path = os.path.join(os.sep, "custom", "target") + source_path = tmp_path / "custom_source" + target_path = tmp_path / "custom_target" + source_path.mkdir() + target_path.mkdir() with patch( "sys.argv", - ["relink.py", "--source-root", source_path, "--target-root", target_path], + [ + "relink.py", + "--source-root", + str(source_path), + "--target-root", + str(target_path), + ], ): args = relink.parse_arguments() - assert args.source_root == source_path - assert args.target_root == target_path + assert args.source_root == str(source_path.resolve()) + assert args.target_root == str(target_path.resolve()) - def test_verbose_flag(self): + def test_verbose_flag(self, mock_default_dirs): # pylint: disable=unused-argument """Test that --verbose flag is parsed correctly.""" with patch("sys.argv", ["relink.py", "--verbose"]): args = relink.parse_arguments() assert args.verbose is True assert args.quiet is False - def test_quiet_flag(self): + def test_quiet_flag(self, mock_default_dirs): # pylint: disable=unused-argument """Test that --quiet flag is parsed correctly.""" with patch("sys.argv", ["relink.py", "--quiet"]): args = relink.parse_arguments() assert args.quiet is True assert args.verbose is False - def test_verbose_short_flag(self): + def test_verbose_short_flag( + self, mock_default_dirs + ): # pylint: disable=unused-argument """Test that -v flag is parsed correctly.""" with patch("sys.argv", ["relink.py", "-v"]): args = relink.parse_arguments() assert args.verbose is True - def test_quiet_short_flag(self): + def test_quiet_short_flag( + self, mock_default_dirs + ): # pylint: disable=unused-argument """Test that -q flag is parsed correctly.""" with patch("sys.argv", ["relink.py", "-q"]): args = relink.parse_arguments() assert args.quiet is True - def test_default_verbosity(self): + def test_default_verbosity( + self, mock_default_dirs + ): # pylint: disable=unused-argument """Test that default verbosity has both flags as False.""" with patch("sys.argv", ["relink.py"]): args = relink.parse_arguments() assert args.verbose is False assert args.quiet is False - def test_verbose_and_quiet_mutually_exclusive(self): + def test_verbose_and_quiet_mutually_exclusive(self, mock_default_dirs): """Test that --verbose and --quiet cannot be used together.""" + # pylint: disable=unused-argument with patch("sys.argv", ["relink.py", "--verbose", "--quiet"]): with pytest.raises(SystemExit) as exc_info: relink.parse_arguments() # Mutually exclusive arguments cause SystemExit with code 2 assert exc_info.value.code == 2 - def test_verbose_and_quiet_short_flags_mutually_exclusive(self): + def test_verbose_and_quiet_short_flags_mutually_exclusive(self, mock_default_dirs): """Test that -v and -q cannot be used together.""" + # pylint: disable=unused-argument with patch("sys.argv", ["relink.py", "-v", "-q"]): with pytest.raises(SystemExit) as exc_info: relink.parse_arguments() # Mutually exclusive arguments cause SystemExit with code 2 assert exc_info.value.code == 2 + def test_dry_run_flag(self, mock_default_dirs): + """Test that --dry-run flag is parsed correctly.""" + # pylint: disable=unused-argument + with patch("sys.argv", ["relink.py", "--dry-run"]): + args = relink.parse_arguments() + assert args.dry_run is True + + def test_dry_run_default(self, mock_default_dirs): + """Test that dry_run defaults to False.""" + # pylint: disable=unused-argument + with patch("sys.argv", ["relink.py"]): + args = relink.parse_arguments() + assert args.dry_run is False -class TestVerbosityLevels: - """Test suite for verbosity level behavior.""" + def test_timing_flag(self, mock_default_dirs): + """Test that --timing flag is parsed correctly.""" + # pylint: disable=unused-argument + with patch("sys.argv", ["relink.py", "--timing"]): + args = relink.parse_arguments() + assert args.timing is True - @pytest.fixture - def temp_dirs(self): - """Create temporary source and target directories for testing.""" - source_dir = tempfile.mkdtemp(prefix="test_source_") - target_dir = tempfile.mkdtemp(prefix="test_target_") + def test_timing_default(self, mock_default_dirs): + """Test that timing defaults to False.""" + # pylint: disable=unused-argument + with patch("sys.argv", ["relink.py"]): + args = relink.parse_arguments() + assert args.timing is False - yield source_dir, target_dir - # Cleanup - shutil.rmtree(source_dir, ignore_errors=True) - shutil.rmtree(target_dir, ignore_errors=True) +class TestVerbosityLevels: + """Test suite for verbosity level behavior.""" def test_quiet_mode_suppresses_info_messages(self, temp_dirs, caplog): """Test that quiet mode suppresses INFO level messages.""" @@ -506,18 +619,6 @@ def mock_symlink(src, dst): class TestEdgeCases: """Test edge cases and error handling.""" - @pytest.fixture - def temp_dirs(self): - """Create temporary source and target directories for testing.""" - source_dir = tempfile.mkdtemp(prefix="test_source_") - target_dir = tempfile.mkdtemp(prefix="test_target_") - - yield source_dir, target_dir - - # Cleanup - shutil.rmtree(source_dir, ignore_errors=True) - shutil.rmtree(target_dir, ignore_errors=True) - def test_empty_directories(self, temp_dirs): """Test with empty directories.""" source_dir, target_dir = temp_dirs @@ -625,3 +726,301 @@ def mock_symlink(src, dst): # Check error message assert "Error creating symlink" in caplog.text assert source_file in caplog.text + + +class TestTiming: + """Test suite for timing functionality.""" + + @pytest.mark.parametrize( + "use_timing, should_log_timing", [(True, True), (False, False)] + ) + def test_timing_logging(self, tmp_path, caplog, use_timing, should_log_timing): + """Test that timing message is logged only when --timing flag is used.""" + # Create real directories + source_dir = tmp_path / "source" + target_dir = tmp_path / "target" + source_dir.mkdir() + target_dir.mkdir() + + # Create a file + source_file = source_dir / "test_file.txt" + target_file = target_dir / "test_file.txt" + source_file.write_text("source") + target_file.write_text("target") + + # Build argv with or without --timing flag + test_argv = [ + "relink.py", + "--source-root", + str(source_dir), + "--target-root", + str(target_dir), + ] + if use_timing: + test_argv.append("--timing") + + with patch("sys.argv", test_argv): + with caplog.at_level(logging.INFO): + # Call main() which includes the timing logic + relink.main() + + # Verify timing message presence based on flag + if should_log_timing: + assert "Execution time:" in caplog.text + assert "seconds" in caplog.text + else: + assert "Execution time:" not in caplog.text + + +class TestValidateDirectory: + """Test suite for validate_directory function.""" + + def test_valid_directory(self, tmp_path): + """Test that valid directory is accepted and returns absolute path.""" + test_dir = tmp_path / "valid_dir" + test_dir.mkdir() + + result = relink.validate_directory(str(test_dir)) + assert result == str(test_dir.resolve()) + + def test_nonexistent_directory(self): + """Test that nonexistent directory raises ArgumentTypeError.""" + nonexistent = os.path.join(os.sep, "nonexistent", "directory", "12345") + + with pytest.raises(argparse.ArgumentTypeError) as exc_info: + relink.validate_directory(nonexistent) + + assert "does not exist" in str(exc_info.value) + assert nonexistent in str(exc_info.value) + + def test_file_instead_of_directory(self, tmp_path): + """Test that a file path raises ArgumentTypeError.""" + test_file = tmp_path / "test_file.txt" + test_file.write_text("content") + + with pytest.raises(argparse.ArgumentTypeError) as exc_info: + relink.validate_directory(str(test_file)) + + assert "not a directory" in str(exc_info.value) + + def test_relative_path_converted_to_absolute(self, tmp_path): + """Test that relative paths are converted to absolute.""" + test_dir = tmp_path / "relative_test" + test_dir.mkdir() + + # Change to parent directory and use relative path + cwd = os.getcwd() + try: + os.chdir(str(tmp_path)) + result = relink.validate_directory("relative_test") + assert os.path.isabs(result) + assert result == str(test_dir.resolve()) + finally: + os.chdir(cwd) + + def test_symlink_to_directory(self, tmp_path): + """Test that symlink to a directory is accepted.""" + real_dir = tmp_path / "real_dir" + real_dir.mkdir() + + link_dir = tmp_path / "link_dir" + link_dir.symlink_to(real_dir) + + result = relink.validate_directory(str(link_dir)) + # validate_directory returns absolute path of the symlink itself + assert result == str(link_dir.absolute()) + # Verify it's still a symlink + assert os.path.islink(result) + + +class TestDryRun: + """Test suite for dry-run functionality.""" + + @pytest.fixture + def dry_run_setup(self, temp_dirs): + """Set up directories and files for dry-run tests.""" + source_dir, target_dir = temp_dirs + username = os.environ["USER"] + + # Create files + source_file = os.path.join(source_dir, "test_file.txt") + target_file = os.path.join(target_dir, "test_file.txt") + + with open(source_file, "w", encoding="utf-8") as f: + f.write("source content") + with open(target_file, "w", encoding="utf-8") as f: + f.write("target content") + + return source_dir, target_dir, source_file, target_file, username + + def test_dry_run_no_changes(self, dry_run_setup, caplog): + """Test that dry-run mode makes no actual changes.""" + source_dir, target_dir, source_file, _, username = dry_run_setup + + # Get original file info + with open(source_file, "r", encoding="utf-8") as f: + original_content = f.read() + original_is_link = os.path.islink(source_file) + + # Run in dry-run mode + with caplog.at_level(logging.INFO): + relink.find_and_replace_owned_files( + source_dir, target_dir, username, dry_run=True + ) + + # Verify no changes were made + assert os.path.isfile(source_file), "Original file should still exist" + assert not os.path.islink(source_file), "File should not be a symlink" + with open(source_file, "r", encoding="utf-8") as f: + assert f.read() == original_content + assert os.path.islink(source_file) == original_is_link + + def test_dry_run_shows_message(self, dry_run_setup, caplog): + """Test that dry-run mode shows what would be done.""" + source_dir, target_dir, source_file, target_file, username = dry_run_setup + + # Run in dry-run mode + with caplog.at_level(logging.INFO): + relink.find_and_replace_owned_files( + source_dir, target_dir, username, dry_run=True + ) + + # Check that dry-run messages were logged + assert "DRY RUN MODE" in caplog.text + assert "[DRY RUN] Would create symbolic link:" in caplog.text + assert f"{source_file} -> {target_file}" in caplog.text + + def test_dry_run_no_delete_or_create_messages(self, dry_run_setup, caplog): + """Test that dry-run doesn't show delete/create messages.""" + source_dir, target_dir, _, _, username = dry_run_setup + + # Run in dry-run mode + with caplog.at_level(logging.INFO): + relink.find_and_replace_owned_files( + source_dir, target_dir, username, dry_run=True + ) + + # Verify actual operation messages are NOT logged + assert "Deleted original file:" not in caplog.text + assert "Created symbolic link:" not in caplog.text + # But the dry-run message should be there + assert "[DRY RUN] Would create symbolic link: " in caplog.text + + +class TestCommandLineExecution: + """Test suite for command-line execution of relink.py.""" + + @pytest.fixture + def mock_dirs(self, tmp_path): + """Create temporary directories and files for command-line testing.""" + source_dir = tmp_path / "source" + target_dir = tmp_path / "target" + source_dir.mkdir() + target_dir.mkdir() + + # Create a test file + source_file = source_dir / "test_file.txt" + target_file = target_dir / "test_file.txt" + source_file.write_text("source content") + target_file.write_text("target content") + + return source_dir, target_dir, source_file, target_file + + def test_command_line_execution_dry_run(self, mock_dirs): + """Test executing relink.py from command line with --dry-run flag.""" + source_dir, target_dir, source_file, _ = mock_dirs + + # Get the path to relink.py + relink_script = os.path.join( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "relink.py" + ) + + # Build the command + command = [ + sys.executable, + relink_script, + "--source-root", + str(source_dir), + "--target-root", + str(target_dir), + "--dry-run", + ] + + # Execute the command + result = subprocess.run(command, capture_output=True, text=True, check=False) + + # Verify the command executed successfully + assert result.returncode == 0, f"Command failed with stderr: {result.stderr}" + + # Verify dry-run messages in output + assert "DRY RUN MODE" in result.stdout + assert "[DRY RUN] Would create symbolic link:" in result.stdout + + # Verify no actual changes were made + assert source_file.is_file() + assert not source_file.is_symlink() + + def test_command_line_execution_actual_run(self, mock_dirs): + """Test executing relink.py from command line without dry-run.""" + source_dir, target_dir, source_file, target_file = mock_dirs + + # Get the path to relink.py + relink_script = os.path.join( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "relink.py" + ) + + # Build the command + command = [ + sys.executable, + relink_script, + "--source-root", + str(source_dir), + "--target-root", + str(target_dir), + ] + + # Execute the command + result = subprocess.run(command, capture_output=True, text=True, check=False) + + # Verify the command executed successfully + assert result.returncode == 0, f"Command failed with stderr: {result.stderr}" + + # Verify the file was converted to a symlink + assert source_file.is_symlink() + assert os.readlink(str(source_file)) == str(target_file) + + # Verify success messages in output + assert "Created symbolic link:" in result.stdout + + +class TestProcessArgs: + """Test suite for process_args function.""" + + # pylint: disable=no-member + + def test_process_args_quiet_sets_warning_level(self): + """Test that quiet flag sets log level to WARNING.""" + args = argparse.Namespace(quiet=True, verbose=False) + relink.process_args(args) + assert args.log_level == logging.WARNING + + def test_process_args_verbose_sets_debug_level(self): + """Test that verbose flag sets log level to DEBUG.""" + args = argparse.Namespace(quiet=False, verbose=True) + relink.process_args(args) + assert args.log_level == logging.DEBUG + + def test_process_args_default_sets_info_level(self): + """Test that default (no flags) sets log level to INFO.""" + args = argparse.Namespace(quiet=False, verbose=False) + relink.process_args(args) + assert args.log_level == logging.INFO + + def test_process_args_modifies_args_in_place(self): + """Test that process_args modifies the args object in place.""" + args = argparse.Namespace(quiet=False, verbose=False) + original_args = args + relink.process_args(args) + # Should be the same object, modified in place + assert args is original_args + assert hasattr(args, "log_level")