From 8f39bcc8fd938a2f12206e9e568764bd3260d8f9 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 5 Jan 2026 14:54:50 +0000 Subject: [PATCH 01/14] Updated RSyncer cleanup logic: * Added 'substrings_blacklist' as an optional attribute upon initialsation of the RSyncer * When finalising, recursively delete directories and files that have patterns matching that of the substrings blacklist, while transferring the rest --- src/murfey/client/rsync.py | 47 +++++++++++++++++++++++++++++++++----- 1 file changed, 41 insertions(+), 6 deletions(-) diff --git a/src/murfey/client/rsync.py b/src/murfey/client/rsync.py index f9729cf0..af1ea93d 100644 --- a/src/murfey/client/rsync.py +++ b/src/murfey/client/rsync.py @@ -10,6 +10,7 @@ import logging import os import queue +import shutil import subprocess import threading import time @@ -61,6 +62,7 @@ def __init__( do_transfer: bool = True, remove_files: bool = False, required_substrings_for_removal: List[str] = [], + substrings_blacklist: dict[str, list[str]] = {}, notify: bool = True, end_time: datetime | None = None, ): @@ -68,16 +70,17 @@ def __init__( self._basepath = basepath_local.absolute() self._basepath_remote = basepath_remote self._rsync_module = rsync_module + self._server_url = server_url + self._stop_callback = stop_callback + self._local = local self._do_transfer = do_transfer self._remove_files = remove_files self._required_substrings_for_removal = required_substrings_for_removal - self._stop_callback = stop_callback - self._local = local - self._server_url = server_url + self._substrings_blacklist = substrings_blacklist self._notify = notify - self._finalised = False self._end_time = end_time self._finalising = False + self._finalised = False self._skipped_files: List[Path] = [] @@ -195,17 +198,49 @@ def finalise( self._notify = False self._end_time = None self._finalising = True + + # Perform recursive cleanup on current directory + files_to_transfer: list[Path] = [] + + def recursive_cleanup(dirpath: str | Path): + for entry in os.scandir(dirpath): + if entry.is_dir(): + # Recursively delete directories with blacklisted substrings + if any( + char in entry.name + for char in self._substrings_blacklist.get("directories", []) + ): + logger.debug(f"Deleting blacklisted directory {entry.path}") + shutil.rmtree(entry.path) + continue + # Recursively search in whitelisted ones + recursive_cleanup(entry.path) + elif entry.is_file(): + # Delete blacklisted files + if any( + char in entry.name + for char in self._substrings_blacklist.get("files", []) + ): + logger.debug(f"Deleting blacklisted file {entry.path}") + Path(entry.path).unlink() + continue + # Append others for transfer + files_to_transfer.append(Path(entry.path)) + + recursive_cleanup(self._basepath) + logger.debug(f"Number of files to transfer: {len(files_to_transfer)}") + if thread: self.thread = threading.Thread( name=f"RSync finalisation {self._basepath}:{self._remote}", target=self._process, daemon=True, ) - for f in self._basepath.glob("**/*"): + for f in files_to_transfer: self.queue.put(f) self.stop() else: - self._transfer(list(self._basepath.glob("**/*"))) + self._transfer(files_to_transfer) self._finalised = True if callback: callback() From 5814fd4ec1cde8ea903f388a44297fa5e00c29b4 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 5 Jan 2026 14:56:32 +0000 Subject: [PATCH 02/14] Pass 'substrings_blacklist' from instrument machine config to RSyncer class upon its initialisation by the MultigridController --- src/murfey/client/multigrid_control.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index ad1020ab..f52414f5 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -380,6 +380,9 @@ def _start_rsyncer( stop_callback=self._rsyncer_stopped, do_transfer=self.do_transfer, remove_files=remove_files, + substrings_blacklist=self._machine_config.get( + "substrings_blacklist", {"directories": [], "files": []} + ), end_time=self.visit_end_time, ) From 0af3f767ed46bcbe9f23a97fc4839482288d6633 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 5 Jan 2026 16:51:47 +0000 Subject: [PATCH 03/14] Updated logger levels and message wording for some RSyncer logs --- src/murfey/client/rsync.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/murfey/client/rsync.py b/src/murfey/client/rsync.py index af1ea93d..3c219a28 100644 --- a/src/murfey/client/rsync.py +++ b/src/murfey/client/rsync.py @@ -172,7 +172,7 @@ def restart(self): self.start() def stop(self): - logger.debug("RSync thread stop requested") + logger.info("RSync thread stop requested") self._stopping = True if self.thread.is_alive(): logger.info("Waiting for ongoing transfers to complete...") @@ -182,7 +182,7 @@ def stop(self): if self.thread.is_alive(): self.queue.put(None) self.thread.join() - logger.debug("RSync thread successfully stopped") + logger.info("RSync thread successfully stopped") def request_stop(self): self._stopping = True @@ -200,6 +200,7 @@ def finalise( self._finalising = True # Perform recursive cleanup on current directory + logger.info("File cleanup started for RSync thread") files_to_transfer: list[Path] = [] def recursive_cleanup(dirpath: str | Path): @@ -242,6 +243,7 @@ def recursive_cleanup(dirpath: str | Path): else: self._transfer(files_to_transfer) self._finalised = True + logger.info("File cleanup for RSync thread successfully completed") if callback: callback() @@ -256,7 +258,7 @@ def flush_skipped(self): self._skipped_files = [] def _process(self): - logger.info("RSync thread starting") + logger.info("Starting RSync thread main process loop") files_to_transfer: list[Path] backoff = 0 while not self._halt_thread: From 2f01b59b91ea952a7f4e7cd91ac604f226e7350d Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 6 Jan 2026 11:34:31 +0000 Subject: [PATCH 04/14] Rearranged RSyncer instance attributes defined in '__init__' --- src/murfey/client/rsync.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/murfey/client/rsync.py b/src/murfey/client/rsync.py index 3c219a28..c1610565 100644 --- a/src/murfey/client/rsync.py +++ b/src/murfey/client/rsync.py @@ -79,18 +79,15 @@ def __init__( self._substrings_blacklist = substrings_blacklist self._notify = notify self._end_time = end_time - self._finalising = False - self._finalised = False self._skipped_files: List[Path] = [] # Set rsync destination - if local: - self._remote = str(basepath_remote) - else: - self._remote = ( - f"{server_url.hostname}::{self._rsync_module}/{basepath_remote}/" - ) + self._remote = ( + str(basepath_remote) + if local + else f"{server_url.hostname}::{self._rsync_module}/{basepath_remote}/" + ) logger.debug(f"rsync destination path set to {self._remote}") # For local tests you can use something along the lines of @@ -108,6 +105,8 @@ def __init__( ) self._stopping = False self._halt_thread = False + self._finalising = False + self._finalised = False def __repr__(self) -> str: return f" Date: Tue, 6 Jan 2026 12:02:56 +0000 Subject: [PATCH 05/14] Added unit test for the '__init__' and 'finalise' RSyncer class methods --- tests/client/test_rsync.py | 312 +++++++++++++++++++++++++++++++++++++ 1 file changed, 312 insertions(+) create mode 100644 tests/client/test_rsync.py diff --git a/tests/client/test_rsync.py b/tests/client/test_rsync.py new file mode 100644 index 00000000..021535c1 --- /dev/null +++ b/tests/client/test_rsync.py @@ -0,0 +1,312 @@ +import queue +import threading +from datetime import datetime +from pathlib import Path +from unittest.mock import MagicMock + +import pytest +from pytest_mock import MockerFixture + +from murfey.client.rsync import RSyncer +from tests.conftest import ExampleVisit + + +@pytest.fixture +def rsync_module(): + return "data" + + +@pytest.fixture +def mock_server_url(): + mock_url = MagicMock() + mock_url.hostname = "10.0.0.1" + return mock_url + + +# Create a dummy callback function +def dummy_callback(): + return None + + +@pytest.mark.parametrize("is_local", (True, False)) +def test_rsyncer_initialises( + tmp_path: Path, + rsync_module: str, + mock_server_url: MagicMock, + is_local: bool, +): + # Assign values to parameters + basepath_local = tmp_path / "local" + basepath_remote = tmp_path / "remote" + do_transfer = True + remove_files = True + + # Create a test substrings blacklist dict + substrings_blacklist = { + "directories": ["1", "2", "3"], + "files": ["a", "b", "c"], + } + + # Create a timestamp + timestamp = datetime.now() + + # Initialise the RSyncer + rsyncer = RSyncer( + basepath_local=basepath_local, + basepath_remote=basepath_remote, + rsync_module=rsync_module, + server_url=mock_server_url, + stop_callback=dummy_callback, + local=is_local, + do_transfer=do_transfer, + remove_files=remove_files, + substrings_blacklist=substrings_blacklist, + end_time=timestamp, + ) + + # Check that the attributes are as expected + assert rsyncer._basepath == basepath_local.absolute() + assert rsyncer._basepath_remote == basepath_remote + assert rsyncer._rsync_module == rsync_module + assert rsyncer._server_url == mock_server_url + assert rsyncer._stop_callback == dummy_callback + assert rsyncer._local == is_local + assert rsyncer._do_transfer == do_transfer + assert rsyncer._remove_files == remove_files + assert rsyncer._required_substrings_for_removal == [] + assert rsyncer._substrings_blacklist == substrings_blacklist + assert rsyncer._notify + assert rsyncer._end_time == timestamp + assert not rsyncer._finalising + assert not rsyncer._finalised + assert rsyncer._skipped_files == [] + assert ( + rsyncer._remote == str(basepath_remote) + if is_local + else f"{mock_server_url.hostname}::{rsync_module}/{basepath_remote}" + ) + assert rsyncer._files_transferred == 0 + assert rsyncer._bytes_transferred == 0 + assert isinstance(rsyncer.queue, queue.Queue) + assert isinstance(rsyncer.thread, threading.Thread) + assert not rsyncer._stopping + assert not rsyncer._halt_thread + + +@pytest.fixture +def clem_visit_dir(tmp_path: Path): + visit_name = f"{ExampleVisit.proposal_code}{ExampleVisit.proposal_number}-{ExampleVisit.visit_number}" + visit_dir = tmp_path / "local" / visit_name + visit_dir.mkdir(parents=True, exist_ok=True) + return visit_dir + + +@pytest.fixture +def clem_test_files(clem_visit_dir: Path): + # Create test files for the DirWatcher to scan + file_list: list[Path] = [] + project_dir = clem_visit_dir / "images" / "test_grid" + + # Example atlas collection + for s in range(20): + file_list.append( + project_dir + / "Overview 1" + / "Image 1" + / f"Image 1--Stage{str(s).zfill(2)}.tif" + ) + file_list.append( + project_dir / "Overview 1" / "Image 1" / "Metadata" / "Image 1.xlif" + ) + + # Example image stack collection + for c in range(3): + for z in range(10): + file_list.append( + project_dir + / "TileScan 1" + / "Position 1" + / f"Position 1--C{str(c).zfill(2)}--Z{str(z).zfill(2)}.tif" + ) + file_list.append( + project_dir / "TileScan 1" / "Position 1" / "Metadata" / "Position 1.xlif" + ) + + # Create all files and directories specified + for file in file_list: + if not file.parent.exists(): + file.parent.mkdir(parents=True) + if not file.exists(): + file.touch() + return sorted(file_list) + + +@pytest.fixture +def clem_junk_files(clem_visit_dir: Path): + # Create junk files that are to be blacklisted from the CLEM workflow + file_list: list[Path] = [] + project_dir = clem_visit_dir / "images" / "test_grid" + + # Create junk atlas data + for n in range(5): + for s in range(20): + file_list.append( + project_dir + / "Image 1" + / f"Image 1_pmd_{n}" + / f"Image 1_pmd_{n}--Stage{str(s).zfill(2)}.tif" + ) + file_list.append( + project_dir / "Image 1" / f"Image 1_pmd_{n}" / "Metadata" / "Image 1.xlif" + ) + + # Creat junk image data + for n in range(5): + for c in range(3): + for z in range(10): + file_list.append( + project_dir + / "Position 1" + / f"Position 1_pmd_{n}" + / f"Position 1_pmd_{n}--C{str(c).zfill(2)}--Z{str(z).zfill(2)}.tif" + ) + file_list.append( + project_dir + / "Position 1" + / f"Position 1_pmd_{n}" + / "Metadata" + / "Position 1.xlif" + ) + + # Create remaining junk files + for file_path in ( + "1.xlef", + "Metadata/IOManagerConfiguation.xlif", + "Metadata/Overview 1.xlcf", + "Metadata/TileScan 1.xlcf", + "Overview 1/Image 1/Image 1_histo.lof", + "TileScan 1/Position 1/Position 1_histo.lof", + "Overview 1/Image 1/Metadata/Image 1_histo.xlif", + "TileScan 1/Position 1/Metadata/Position 1_histo.xlif", + ): + file_list.append(project_dir / file_path) + + # Create files and directoriees + for file in file_list: + if not file.parent.exists(): + file.parent.mkdir(parents=True) + if not file.exists(): + file.touch() + return sorted(file_list) + + +clem_substrings_blacklist = { + "directories": [ + "_pmd_", + ], + "files": [ + ".xlef", + ".xlcf", + "_histo.lof", + "_histo.xlif", + "IOManagerConfiguation.xlif", + ], +} + +rsyncer_finalise_params_matrix: tuple[tuple[str, bool, bool], ...] = ( + # Workflow type | Use thread? | Use callback function? + ("clem", False, False), + ("clem", False, True), + ("clem", True, False), + ("clem", True, True), +) + + +@pytest.mark.parametrize("test_params", rsyncer_finalise_params_matrix) +def test_rsyncer_finalise( + mocker: MockerFixture, + rsync_module: str, + mock_server_url: MagicMock, + clem_visit_dir: Path, + clem_test_files: list[Path], + clem_junk_files: list[Path], + test_params: tuple[str, bool, bool], +): + # Unpack test params + workflow_type, use_thread, use_callback = test_params + + # Create a test end time + timestamp = datetime.now() + + # Mock the class functions/attributes called by the 'finalise' class function + mock_queue = MagicMock() + mock_queue.put.return_value = None + + mock_transfer = mocker.patch.object(RSyncer, "_transfer") + mock_transfer.return_value = True + + mock_stop = mocker.patch.object(RSyncer, "stop") + mock_stop.return_value = None + + mock_process = mocker.patch.object(RSyncer, "_process") + mock_process.return_value = None + + mock_callback = MagicMock(return_value=None) + + # Initialise the RSyncer class based on the workflow type being tested + if workflow_type == "clem": + rsyncer = RSyncer( + basepath_local=clem_visit_dir / "images", + basepath_remote=Path(clem_visit_dir.name) / "images", + rsync_module=rsync_module, + server_url=mock_server_url, + stop_callback=dummy_callback, + substrings_blacklist=clem_substrings_blacklist, + end_time=timestamp, + ) + # Patch the 'queue' attribute with the mocked one + rsyncer.queue = mock_queue + + # Check the initial state of attributes that will be changed by 'finalise' + assert not rsyncer._remove_files + assert rsyncer._notify + assert rsyncer._end_time == timestamp + assert not rsyncer._finalising + assert not rsyncer._finalised + + # Run the 'finalise' class function with the workflow-specific paths + rsyncer.finalise( + thread=use_thread, + callback=mock_callback if use_callback else None, + ) + + # Check that attributes are set correctly at the start of the function + assert rsyncer._remove_files + assert not rsyncer._notify + assert rsyncer._end_time is None + assert rsyncer._finalising + + # Check that list of files to transfer doesn't include blacklisted files + if use_thread: + for file in clem_test_files: + mock_queue.put.assert_any_call(file) + else: + transfer_args = mock_transfer.call_args.args + assert sorted(transfer_args[0]) == sorted(clem_test_files) + + # Check that the blacklisted files no longer exist + for file in clem_junk_files: + assert not file.exists() + # Transfer is being mocked, so check that files to transfer are all present + for file in clem_test_files: + assert file.exists() + + # Check that stop was called the correct number of times depending on the setup + assert mock_stop.call_count == 2 if use_thread else 1 + + # Check that the RSyncer is marked as finalised at the end + assert rsyncer._finalised + + # Check that the callback was set at the end + if use_callback: + mock_callback.assert_called_once() From da01856fe400f3db64da034081919ac22bdb6ba3 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 6 Jan 2026 12:11:08 +0000 Subject: [PATCH 06/14] Removed vestigial rsync utils module and its associated test --- src/murfey/util/rsync.py | 172 --------------------------------------- tests/util/test_rsync.py | 133 ------------------------------ 2 files changed, 305 deletions(-) delete mode 100644 src/murfey/util/rsync.py delete mode 100644 tests/util/test_rsync.py diff --git a/src/murfey/util/rsync.py b/src/murfey/util/rsync.py deleted file mode 100644 index bcbb4b05..00000000 --- a/src/murfey/util/rsync.py +++ /dev/null @@ -1,172 +0,0 @@ -from __future__ import annotations - -import logging -import subprocess -from pathlib import Path -from typing import Callable, Dict, List, Optional, Tuple, Union - -from murfey.util import Processor -from murfey.util.file_monitor import Monitor - -logger = logging.getLogger("murfey.util.rsync") - - -class RsyncPipe(Processor): - def __init__( - self, - finaldir: Path, - name: str = "rsync_pipe", - root: Optional[Path] = None, - notify: Optional[Callable[[Path], Optional[dict]]] = None, - destination_structure: Optional[ - Callable[[Path, Path], Tuple[Path, str]] - ] = None, - ): - super().__init__(name=name) - self._finaldir = finaldir - self.failed: List[Path] = [] - self._failed_tmp: List[str] = [] - self._transferring = False - self.sent_bytes = 0 - self.received_bytes = 0 - self.byte_rate: float = 0 - self.total_size = 0 - self.runner_return: List[subprocess.CompletedProcess] = [] - self._root = root - self._sub_structure: Optional[Path] = None - self._notify = notify or (lambda f: None) - self._destination_structure = destination_structure - - def _process(self, retry: bool = True, **kwargs): - if isinstance(self._previous, Monitor) and self._previous.thread: - while self._previous.thread.is_alive(): - files_for_transfer = self._in.get() - if not files_for_transfer: - continue - self._run_rsync(self._previous.dir, files_for_transfer, retry=retry) - - def _run_rsync( - self, - root: Path, - files: List[Path], - retry: bool = True, - ): - """ - Run rsync -v on a list of files using subprocess. - - :param root: root path of files for transferring; structure below the root is preserved - :type root: pathlib.Path object - :param files: List of files to be transferred - :type files: list of strigs or pathlib.Path objects - :param destination: Directory that files are to be copied to. - :type destination: string or pathlib.Path object - :param retry: If True put failed files back into the queue to be consumed - :type retry: bool - """ - self._root = root - - def _structure(p: Path) -> Path: - return (p.relative_to(root)).parent - - divided_files: Dict[Path, List[Path]] = {} - for f in files: - s = _structure(f) - try: - divided_files[s].append(f) - except KeyError: - divided_files[s] = [f] - for s in divided_files.keys(): - if self._destination_structure: - for f in divided_files[s]: - self._sub_structure, new_file_name = self._destination_structure( - s, f - ) - self._single_rsync( - root, - self._sub_structure, - [f], - file_name=Path(new_file_name), - retry=retry, - ) - else: - self._sub_structure = s - self._single_rsync(root, s, divided_files[s], retry=retry) - - def _single_rsync( - self, - root: Path, - sub_struct: Union[str, Path], - sources: List[Path], - file_name: Optional[Path] = None, - retry: bool = True, - ): - cmd: List[str] = ["rsync", "-v"] - self._failed_tmp = [] - cmd.extend(str(f) for f in sources) - if file_name: - cmd.append(str(self._finaldir / sub_struct / file_name)) - else: - cmd.append(str(self._finaldir / sub_struct) + "/") - self._transferring = True - - runner = subprocess.run( - cmd, - capture_output=True, - ) - for line in runner.stdout.decode("utf-8", "replace").split("\n"): - self._parse_rsync_stdout(line) - for line in runner.stderr.decode("utf-8", "replace").split("\n"): - self._parse_rsync_stderr(line) - self.runner_return.append(runner) - self.failed.extend(root / sub_struct / f for f in self._failed_tmp) - if retry: - self._in.put(root / sub_struct / f for f in self._failed_tmp) - - def _parse_rsync_stdout(self, line: str): - """ - Parse rsync stdout to collect information such as the paths of transferred - files and the amount of data transferred. - - :param stdout: stdout of rsync process - :type stdout: bytes - """ - if self._transferring: - if line.startswith("sent"): - self._transferring = False - byte_info = line.split() - self.sent_bytes = int( - byte_info[byte_info.index("sent") + 1].replace(",", "") - ) - self.received_bytes = int( - byte_info[byte_info.index("received") + 1].replace(",", "") - ) - self.byte_rate = float( - byte_info[byte_info.index("bytes/sec") - 1].replace(",", "") - ) - elif len(line.split()) == 1: - if self._root and self._sub_structure: - self._notify(self._finaldir / self._sub_structure / line) - self._out.put(self._root / self._sub_structure / line) - else: - logger.warning( - f"root or substructure not set for transfer of {line}" - ) - else: - if "total size" in line: - self.total_size = int(line.replace("total size", "").split()[1]) - - def _parse_rsync_stderr(self, line: str): - """ - Parse rsync stderr to collect information on any files that failed to transfer. - - :param stderr: stderr of rsync process - :type stderr: bytes - """ - if ( - line.startswith("rsync: link_stat") - or line.startswith("rsync: [sender] link_stat") - ) and "failed" in line: - failed_msg = line.split() - self._failed_tmp.append( - failed_msg[failed_msg.index("failed:") - 1].replace('"', "") - ) diff --git a/tests/util/test_rsync.py b/tests/util/test_rsync.py deleted file mode 100644 index 567c4a4a..00000000 --- a/tests/util/test_rsync.py +++ /dev/null @@ -1,133 +0,0 @@ -from __future__ import annotations - -from pathlib import Path -from typing import Tuple - -from murfey.util.file_monitor import Monitor -from murfey.util.rsync import RsyncPipe - - -def test_a_simple_rsync_instance(tmp_path): - (tmp_path / "from").mkdir() - destination = tmp_path / "to" - destination.mkdir() - f01 = tmp_path / "from" / "file01.txt" - f01.touch() - rp = RsyncPipe(destination) - rp._run_rsync(tmp_path / "from", [f01]) - assert rp._out.qsize() == 1 - transferred = rp._out.get() - assert transferred == f01 - - -def test_rsync_multiple_files(tmp_path): - (tmp_path / "from").mkdir() - destination = tmp_path / "to" - destination.mkdir() - f01 = tmp_path / "from" / "file01.txt" - f01.touch() - f02 = tmp_path / "from" / "file02.txt" - f02.touch() - rp = RsyncPipe(destination) - rp._run_rsync(tmp_path / "from", [f01, f02]) - assert rp._out.qsize() == 2 - transferred = [rp._out.get()] - transferred.append(rp._out.get()) - assert len(transferred) == 2 - assert set(transferred) == {f01, f02} - - -def test_rsync_a_nonexistant_file(tmp_path): - (tmp_path / "from").mkdir() - destination = tmp_path / "to" - destination.mkdir() - f01 = tmp_path / "from" / "file01.txt" - f01.touch() - f02 = tmp_path / "from" / "file02.txt" - rp = RsyncPipe(destination) - rp._run_rsync(tmp_path / "from", [f01, f02], retry=False) - assert rp._out.qsize() == 1 - transferred = rp._out.get() - assert transferred == f01 - assert len(rp.failed) == 1 - - -def test_rsync_instance_on_nested_directory_structure(tmp_path): - initial_dir = tmp_path / "from" / "nest" - initial_dir.mkdir(parents=True) - destination = tmp_path / "to" - destination.mkdir() - f01 = initial_dir / "file01.txt" - f01.touch() - rp = RsyncPipe(destination) - rp._run_rsync(tmp_path / "from", [f01]) - assert rp._out.qsize() == 1 - transferred = rp._out.get() - assert transferred == f01 - assert not len(rp.failed) - assert (destination / "nest" / "file01.txt").exists() - - -def test_rsync_pipe_from_monitor(tmp_path): - (tmp_path / "from").mkdir() - destination = tmp_path / "to" - destination.mkdir() - f01 = tmp_path / "from" / "file01.txt" - f01.touch() - monitor = Monitor(tmp_path / "from") - monitor.process(in_thread=True, sleep=0.1) - rp = RsyncPipe(destination) - monitor >> rp - rp.process(in_thread=True) - assert rp.thread - monitor.stop() - monitor.wait() - rp.wait() - assert (destination / "file01.txt").exists() - - -def test_rsync_with_additional_structure_without_changing_file_name(tmp_path): - def _new_structure(s: Path, p: Path) -> Tuple[Path, str]: - new_name = p.name - new_dest = s / "extra" - return new_dest, new_name - - (tmp_path / "from").mkdir() - destination = tmp_path / "to" - destination.mkdir() - (destination / "extra").mkdir() - f01 = tmp_path / "from" / "file01.txt" - f01.touch() - monitor = Monitor(tmp_path / "from") - monitor.process(in_thread=True, sleep=0.1) - rp = RsyncPipe(destination, destination_structure=_new_structure) - monitor >> rp - rp.process(in_thread=True) - assert rp.thread - monitor.stop() - monitor.wait() - rp.wait() - assert (destination / "extra" / "file01.txt").exists() - - -def test_rsync_with_changed_file_name(tmp_path): - def _new_structure(s: Path, p: Path) -> Tuple[Path, str]: - new_name = p.name.replace("01", "05") - return s, new_name - - (tmp_path / "from").mkdir() - destination = tmp_path / "to" - destination.mkdir() - f01 = tmp_path / "from" / "file01.txt" - f01.touch() - monitor = Monitor(tmp_path / "from") - monitor.process(in_thread=True, sleep=0.1) - rp = RsyncPipe(destination, destination_structure=_new_structure) - monitor >> rp - rp.process(in_thread=True) - assert rp.thread - monitor.stop() - monitor.wait() - rp.wait() - assert not (destination / "file01.txt").exists() - assert (destination / "file05.txt").exists() From 49713aa36cb4037c9db12c865e982b6ba7388861 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 6 Jan 2026 12:24:13 +0000 Subject: [PATCH 07/14] Moved 'status' class function so that it's right under '__init__', since it's used as an instance attribute --- src/murfey/client/rsync.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/murfey/client/rsync.py b/src/murfey/client/rsync.py index c1610565..1b4d4507 100644 --- a/src/murfey/client/rsync.py +++ b/src/murfey/client/rsync.py @@ -108,6 +108,19 @@ def __init__( self._finalising = False self._finalised = False + @property + def status(self) -> str: + if self._stopping: + if self.thread.is_alive(): + return "stopping" + else: + return "finished" + else: + if self.thread.is_alive(): + return "running" + else: + return "ready" + def __repr__(self) -> str: return f" str: - if self._stopping: - if self.thread.is_alive(): - return "stopping" - else: - return "finished" - else: - if self.thread.is_alive(): - return "running" - else: - return "ready" - def notify(self, *args, secondary: bool = False, **kwargs) -> None: if self._notify: super().notify(*args, secondary=secondary, **kwargs) From fe781861aeb62cdb2615f7f01f12261dbd995c14 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 6 Jan 2026 14:05:26 +0000 Subject: [PATCH 08/14] Added tests for the 'start', 'restart', 'stop', 'request_stop', 'enqueue', and 'flush_skipped' RSyncer class methods --- tests/client/test_rsync.py | 210 +++++++++++++++++++++++++++++++++++-- 1 file changed, 202 insertions(+), 8 deletions(-) diff --git a/tests/client/test_rsync.py b/tests/client/test_rsync.py index 021535c1..51d49e4a 100644 --- a/tests/client/test_rsync.py +++ b/tests/client/test_rsync.py @@ -2,6 +2,7 @@ import threading from datetime import datetime from pathlib import Path +from unittest import mock from unittest.mock import MagicMock import pytest @@ -38,8 +39,6 @@ def test_rsyncer_initialises( # Assign values to parameters basepath_local = tmp_path / "local" basepath_remote = tmp_path / "remote" - do_transfer = True - remove_files = True # Create a test substrings blacklist dict substrings_blacklist = { @@ -58,8 +57,6 @@ def test_rsyncer_initialises( server_url=mock_server_url, stop_callback=dummy_callback, local=is_local, - do_transfer=do_transfer, - remove_files=remove_files, substrings_blacklist=substrings_blacklist, end_time=timestamp, ) @@ -71,14 +68,12 @@ def test_rsyncer_initialises( assert rsyncer._server_url == mock_server_url assert rsyncer._stop_callback == dummy_callback assert rsyncer._local == is_local - assert rsyncer._do_transfer == do_transfer - assert rsyncer._remove_files == remove_files + assert rsyncer._do_transfer + assert not rsyncer._remove_files assert rsyncer._required_substrings_for_removal == [] assert rsyncer._substrings_blacklist == substrings_blacklist assert rsyncer._notify assert rsyncer._end_time == timestamp - assert not rsyncer._finalising - assert not rsyncer._finalised assert rsyncer._skipped_files == [] assert ( rsyncer._remote == str(basepath_remote) @@ -91,6 +86,145 @@ def test_rsyncer_initialises( assert isinstance(rsyncer.thread, threading.Thread) assert not rsyncer._stopping assert not rsyncer._halt_thread + assert not rsyncer._finalising + assert not rsyncer._finalised + + assert rsyncer.status == "ready" + + # Check that it's represented correctly + assert ( + str(rsyncer) + == f" Date: Tue, 6 Jan 2026 14:23:28 +0000 Subject: [PATCH 09/14] Added test for RSyncer 'status' property --- tests/client/test_rsync.py | 35 +++++++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/tests/client/test_rsync.py b/tests/client/test_rsync.py index 51d49e4a..287545b0 100644 --- a/tests/client/test_rsync.py +++ b/tests/client/test_rsync.py @@ -89,12 +89,43 @@ def test_rsyncer_initialises( assert not rsyncer._finalising assert not rsyncer._finalised - assert rsyncer.status == "ready" + +@pytest.mark.parametrize( + "test_params", + ( + # Is stopping? | Is thread alive? | Expected status + (False, False, "ready"), + (False, True, "running"), + (True, True, "stopping"), + (True, False, "finished"), + ), +) +def test_rsyncer_status( + tmp_path: Path, + mock_server_url: MagicMock, + test_params: tuple[bool, bool, str], +): + # Unpack test params + is_stopping, is_thread_alive, expected_status = test_params + + # Mock the thread + mock_thread = MagicMock() + mock_thread.is_alive.return_value = is_thread_alive + + # Initialise the RSyncer and patch the attributes to be tested + rsyncer = RSyncer( + basepath_local=tmp_path / "local", + basepath_remote=tmp_path / "remote", + rsync_module=mock.ANY, + server_url=mock_server_url, + ) + rsyncer.thread = mock_thread + rsyncer._stopping = is_stopping # Check that it's represented correctly assert ( str(rsyncer) - == f" Date: Tue, 6 Jan 2026 14:55:29 +0000 Subject: [PATCH 10/14] Added test for 'notify' RSyncer class method --- tests/client/test_rsync.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/tests/client/test_rsync.py b/tests/client/test_rsync.py index 287545b0..02e809ef 100644 --- a/tests/client/test_rsync.py +++ b/tests/client/test_rsync.py @@ -129,6 +129,42 @@ def test_rsyncer_status( ) +@pytest.mark.parametrize("notify", (True, False)) +def test_rsyncer_notify( + mocker: MockerFixture, + tmp_path: Path, + mock_server_url: MagicMock, + notify: bool, +): + # Patch the superclass that RSyncer stems from + mock_notify = mocker.patch("murfey.client.rsync.Observer.notify") + mock_notify.return_value = None + + # Initialise the RSyncer + rsyncer = RSyncer( + basepath_local=tmp_path / "local", + basepath_remote=tmp_path / "remote", + rsync_module=mock.ANY, + server_url=mock_server_url, + notify=notify, + ) + # Check that the 'notify' attribute is set correctly + assert rsyncer._notify == notify + + # Run 'notify' and check that the expected calls were made + rsyncer.notify("arg1", "arg2", kwarg1="kwarg1", kwarg2="kwarg2") + if notify: + mock_notify.assert_called_once_with( + "arg1", + "arg2", + secondary=False, + kwarg1="kwarg1", + kwarg2="kwarg2", + ) + else: + mock_notify.assert_not_called() + + @pytest.mark.parametrize("rsyncer_status", ("default", "is_alive", "stopping")) def test_rsyncer_start( tmp_path: Path, From 44470095f45c3d9ee57688ad0743c6ab14dea9fa Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 6 Jan 2026 17:01:35 +0000 Subject: [PATCH 11/14] Use 'pattern' instead of 'char' when iterating over items in substrings blacklist --- src/murfey/client/rsync.py | 8 ++++---- src/murfey/client/watchdir.py | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/murfey/client/rsync.py b/src/murfey/client/rsync.py index 1b4d4507..7cd1f74d 100644 --- a/src/murfey/client/rsync.py +++ b/src/murfey/client/rsync.py @@ -207,8 +207,8 @@ def recursive_cleanup(dirpath: str | Path): if entry.is_dir(): # Recursively delete directories with blacklisted substrings if any( - char in entry.name - for char in self._substrings_blacklist.get("directories", []) + pattern in entry.name + for pattern in self._substrings_blacklist.get("directories", []) ): logger.debug(f"Deleting blacklisted directory {entry.path}") shutil.rmtree(entry.path) @@ -218,8 +218,8 @@ def recursive_cleanup(dirpath: str | Path): elif entry.is_file(): # Delete blacklisted files if any( - char in entry.name - for char in self._substrings_blacklist.get("files", []) + pattern in entry.name + for pattern in self._substrings_blacklist.get("files", []) ): logger.debug(f"Deleting blacklisted file {entry.path}") Path(entry.path).unlink() diff --git a/src/murfey/client/watchdir.py b/src/murfey/client/watchdir.py index fdb3d363..24abe029 100644 --- a/src/murfey/client/watchdir.py +++ b/src/murfey/client/watchdir.py @@ -247,8 +247,8 @@ def _scan_directory( entry_name = os.path.join(path, entry.name) # Skip any directories with matching blacklisted substrings if entry.is_dir() and any( - char in entry.name - for char in self._substrings_blacklist.get("directories", []) + pattern in entry.name + for pattern in self._substrings_blacklist.get("directories", []) ): log.debug(f"Skipping blacklisted directory {str(entry.name)!r}") continue @@ -262,8 +262,8 @@ def _scan_directory( continue # Exclude files with blacklisted substrings if any( - char in entry.name - for char in self._substrings_blacklist.get("files", []) + pattern in entry.name + for pattern in self._substrings_blacklist.get("files", []) ): log.debug(f"Skipping blacklisted file {str(entry.name)!r}") continue From 3e6f0a688d9c128a1a571e5e6dcaeb179fc62dd0 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 7 Jan 2026 09:30:02 +0000 Subject: [PATCH 12/14] Remove status information from canonical representation of RSyncer --- src/murfey/client/rsync.py | 2 +- tests/client/test_rsync.py | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/murfey/client/rsync.py b/src/murfey/client/rsync.py index 7cd1f74d..38dbee5e 100644 --- a/src/murfey/client/rsync.py +++ b/src/murfey/client/rsync.py @@ -122,7 +122,7 @@ def status(self) -> str: return "ready" def __repr__(self) -> str: - return f"" @classmethod def from_rsyncer(cls, rsyncer: RSyncer, **kwargs): diff --git a/tests/client/test_rsync.py b/tests/client/test_rsync.py index 02e809ef..b9145fb3 100644 --- a/tests/client/test_rsync.py +++ b/tests/client/test_rsync.py @@ -122,11 +122,11 @@ def test_rsyncer_status( rsyncer.thread = mock_thread rsyncer._stopping = is_stopping - # Check that it's represented correctly - assert ( - str(rsyncer) - == f"" @pytest.mark.parametrize("notify", (True, False)) From e8859a463bb689ba0bc69a950b93a6970552433f Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 7 Jan 2026 09:38:54 +0000 Subject: [PATCH 13/14] Updated RSyncer logs to include name of RSyncer --- src/murfey/client/rsync.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/murfey/client/rsync.py b/src/murfey/client/rsync.py index 38dbee5e..deae2f44 100644 --- a/src/murfey/client/rsync.py +++ b/src/murfey/client/rsync.py @@ -171,7 +171,7 @@ def restart(self): self.start() def stop(self): - logger.info("RSync thread stop requested") + logger.info(f"Stopping RSync thread {self}") self._stopping = True if self.thread.is_alive(): logger.info("Waiting for ongoing transfers to complete...") @@ -181,7 +181,7 @@ def stop(self): if self.thread.is_alive(): self.queue.put(None) self.thread.join() - logger.info("RSync thread successfully stopped") + logger.info(f"RSync thread {self} successfully stopped") def request_stop(self): self._stopping = True @@ -199,7 +199,7 @@ def finalise( self._finalising = True # Perform recursive cleanup on current directory - logger.info("File cleanup started for RSync thread") + logger.info(f"Starting file cleanup for RSync thread {self}") files_to_transfer: list[Path] = [] def recursive_cleanup(dirpath: str | Path): @@ -242,7 +242,7 @@ def recursive_cleanup(dirpath: str | Path): else: self._transfer(files_to_transfer) self._finalised = True - logger.info("File cleanup for RSync thread successfully completed") + logger.info(f"File cleanup for RSync thread {self} successfully completed") if callback: callback() @@ -257,7 +257,7 @@ def flush_skipped(self): self._skipped_files = [] def _process(self): - logger.info("Starting RSync thread main process loop") + logger.info(f"Starting main process loop for RSync thread {self}") files_to_transfer: list[Path] backoff = 0 while not self._halt_thread: From 87d92aaab4bf50bc059e67735f04508ba7557679 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 7 Jan 2026 09:55:14 +0000 Subject: [PATCH 14/14] Added test iterations to check that file cleanup and transfer with and without setting a substrings blacklist are as expected --- tests/client/test_rsync.py | 45 ++++++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/tests/client/test_rsync.py b/tests/client/test_rsync.py index b9145fb3..96e1e4fb 100644 --- a/tests/client/test_rsync.py +++ b/tests/client/test_rsync.py @@ -414,16 +414,21 @@ def clem_junk_files(clem_visit_dir: Path): ], } -rsyncer_finalise_params_matrix: tuple[tuple[str, bool, bool], ...] = ( - # Workflow type | Use thread? | Use callback function? - ("clem", False, False), - ("clem", False, True), - ("clem", True, False), - ("clem", True, True), -) - -@pytest.mark.parametrize("test_params", rsyncer_finalise_params_matrix) +@pytest.mark.parametrize( + "test_params", + ( + # Workflow type | Use thread? | Use callback function? | Use blacklist? + ("clem", False, False, False), + ("clem", False, False, True), + ("clem", False, True, False), + ("clem", False, True, True), + ("clem", True, False, False), + ("clem", True, False, True), + ("clem", True, True, False), + ("clem", True, True, True), + ), +) def test_rsyncer_finalise( mocker: MockerFixture, rsync_module: str, @@ -431,10 +436,10 @@ def test_rsyncer_finalise( clem_visit_dir: Path, clem_test_files: list[Path], clem_junk_files: list[Path], - test_params: tuple[str, bool, bool], + test_params: tuple[str, bool, bool, bool], ): # Unpack test params - workflow_type, use_thread, use_callback = test_params + workflow_type, use_thread, use_callback, use_blacklist = test_params # Create a test end time timestamp = datetime.now() @@ -462,7 +467,7 @@ def test_rsyncer_finalise( rsync_module=rsync_module, server_url=mock_server_url, stop_callback=dummy_callback, - substrings_blacklist=clem_substrings_blacklist, + substrings_blacklist=clem_substrings_blacklist if use_blacklist else {}, end_time=timestamp, ) # Patch the 'queue' attribute with the mocked one @@ -487,20 +492,26 @@ def test_rsyncer_finalise( assert rsyncer._end_time is None assert rsyncer._finalising - # Check that list of files to transfer doesn't include blacklisted files + # Check that list of files with and without using a blacklist are correct if use_thread: for file in clem_test_files: mock_queue.put.assert_any_call(file) + if not use_blacklist: + for file in clem_junk_files: + mock_queue.put.assert_any_call(file) else: transfer_args = mock_transfer.call_args.args - assert sorted(transfer_args[0]) == sorted(clem_test_files) + assert sorted(transfer_args[0]) == ( + sorted(clem_test_files) + if use_blacklist + else sorted([*clem_test_files, *clem_junk_files]) + ) - # Check that the blacklisted files no longer exist - for file in clem_junk_files: - assert not file.exists() # Transfer is being mocked, so check that files to transfer are all present for file in clem_test_files: assert file.exists() + for file in clem_junk_files: + assert not file.exists() if use_blacklist else file.exists() # Check that stop was called the correct number of times depending on the setup assert mock_stop.call_count == 2 if use_thread else 1