diff --git a/src/virtualship/cli/_run.py b/src/virtualship/cli/_run.py index f07fbab27..d2934e59d 100644 --- a/src/virtualship/cli/_run.py +++ b/src/virtualship/cli/_run.py @@ -1,5 +1,6 @@ """do_expedition function.""" +import glob import logging import os import shutil @@ -14,11 +15,13 @@ ScheduleProblem, simulate_schedule, ) -from virtualship.models import Schedule -from virtualship.models.checkpoint import Checkpoint +from virtualship.make_realistic.problems.simulator import ProblemSimulator +from virtualship.models import Checkpoint, Schedule from virtualship.utils import ( CHECKPOINT, + PROBLEMS_ENCOUNTERED_DIR, _get_expedition, + _save_checkpoint, expedition_cost, get_instrument_class, ) @@ -35,7 +38,10 @@ logging.getLogger("copernicusmarine").setLevel("ERROR") -def _run(expedition_dir: str | Path, from_data: Path | None = None) -> None: +# TODO: prob-level needs to be parsed from CLI args; currently set to 1 override for testing purposes +def _run( + expedition_dir: str | Path, from_data: Path | None = None, prob_level: int = 1 +) -> None: """ Perform an expedition, providing terminal feedback and file output. @@ -73,7 +79,7 @@ def _run(expedition_dir: str | Path, from_data: Path | None = None) -> None: expedition = _get_expedition(expedition_dir) - # Verify instruments_config file is consistent with schedule + # verify instruments_config file is consistent with schedule expedition.instruments_config.verify(expedition) # load last checkpoint @@ -81,8 +87,8 @@ def _run(expedition_dir: str | Path, from_data: Path | None = None) -> None: if checkpoint is None: checkpoint = Checkpoint(past_schedule=Schedule(waypoints=[])) - # verify that schedule and checkpoint match - checkpoint.verify(expedition.schedule) + # verify that schedule and checkpoint match, and that problems have been resolved + checkpoint.verify(expedition.schedule, expedition_dir) print("\n---- WAYPOINT VERIFICATION ----") @@ -96,17 +102,16 @@ def _run(expedition_dir: str | Path, from_data: Path | None = None) -> None: projection=projection, expedition=expedition, ) + + # handle cases where user defined schedule is incompatible (i.e. not enough time between waypoints, not problems) if isinstance(schedule_results, ScheduleProblem): print( f"SIMULATION PAUSED: update your schedule (`virtualship plan`) and continue the expedition by executing the `virtualship run` command again.\nCheckpoint has been saved to {expedition_dir.joinpath(CHECKPOINT)}." ) _save_checkpoint( Checkpoint( - past_schedule=Schedule( - waypoints=expedition.schedule.waypoints[ - : schedule_results.failed_waypoint_i - ] - ) + past_schedule=expedition.schedule, + failed_waypoint_i=schedule_results.failed_waypoint_i, ), expedition_dir, ) @@ -124,12 +129,30 @@ def _run(expedition_dir: str | Path, from_data: Path | None = None) -> None: print("\n--- MEASUREMENT SIMULATIONS ---") + # identify problems + # TODO: prob_level needs to be parsed from CLI args + problem_simulator = ProblemSimulator( + expedition.schedule, prob_level, expedition_dir + ) + problems = problem_simulator.select_problems() + # simulate measurements print("\nSimulating measurements. This may take a while...\n") + # TODO: logic for getting simulations to carry on from last checkpoint! Building on .zarr files already created... + instruments_in_expedition = expedition.get_instruments() for itype in instruments_in_expedition: + #! TODO: move this to before the loop; determine problem selection based on instruments_in_expedition to ensure only relevant problems are selected; and then instrument problems are propagated to within the loop + # TODO: instrument-specific problems at different waypoints are where see if can get time savings by not re-simulating everything from scratch... but if it's too complex than just leave for now + # propagate problems + if problems: + problem_simulator.execute( + problems=problems, + instrument_type=itype, + ) + # get instrument class instrument_class = get_instrument_class(itype) if instrument_class is None: @@ -158,6 +181,15 @@ def _run(expedition_dir: str | Path, from_data: Path | None = None) -> None: print( f"Your measurements can be found in the '{expedition_dir}/results' directory." ) + + # TODO: delete checkpoint file at the end of successful expedition? [it inteferes with ability to re-run expedition] + + if problems: + print("\n----- RECORD OF PROBLEMS ENCOUNTERED ------") + print( + f"\nA record of problems encountered during the expedition is saved in: {expedition_dir.joinpath(PROBLEMS_ENCOUNTERED_DIR)}" + ) + print("\n------------- END -------------\n") # end timing @@ -174,9 +206,13 @@ def _load_checkpoint(expedition_dir: Path) -> Checkpoint | None: return None -def _save_checkpoint(checkpoint: Checkpoint, expedition_dir: Path) -> None: - file_path = expedition_dir.joinpath(CHECKPOINT) - checkpoint.to_yaml(file_path) +def _load_hashes(expedition_dir: Path) -> set[str]: + hashes_path = expedition_dir.joinpath(PROBLEMS_ENCOUNTERED_DIR) + if not hashes_path.exists(): + return set() + hash_files = glob.glob(str(hashes_path / "problem_*.txt")) + hashes = {Path(f).stem.split("_")[1] for f in hash_files} + return hashes def _write_expedition_cost(expedition, schedule_results, expedition_dir): diff --git a/src/virtualship/errors.py b/src/virtualship/errors.py index ac1aa8a1b..60a4b0ef2 100644 --- a/src/virtualship/errors.py +++ b/src/virtualship/errors.py @@ -50,3 +50,9 @@ class CopernicusCatalogueError(Exception): """Error raised when a relevant product is not found in the Copernicus Catalogue.""" pass + + +class ProblemEncountered(Exception): + """Error raised when a problem is encountered during simulation.""" + + pass diff --git a/src/virtualship/expedition/simulate_schedule.py b/src/virtualship/expedition/simulate_schedule.py index e450fcc7c..656a27223 100644 --- a/src/virtualship/expedition/simulate_schedule.py +++ b/src/virtualship/expedition/simulate_schedule.py @@ -124,7 +124,8 @@ def simulate(self) -> ScheduleOk | ScheduleProblem: print( f"Waypoint {wp_i + 1} could not be reached in time. Current time: {self._time}. Waypoint time: {waypoint.time}." "\n\nHave you ensured that your schedule includes sufficient time for taking measurements, e.g. CTD casts (in addition to the time it takes to sail between waypoints)?\n" - "**Note**, the `virtualship plan` tool will not account for measurement times when verifying the schedule, only the time it takes to sail between waypoints.\n" + "**Hint #1**, the `virtualship plan` tool will not account for measurement times when verifying the schedule, only the time it takes to sail between waypoints.\n" + "**Hint #2**: if you previously encountered any unforeseen delays (e.g. equipment failure, pre-departure delays) during your expedition, you will need to adjust the timings of **all** waypoints after the affected waypoint, not just the next one." ) return ScheduleProblem(self._time, wp_i) else: diff --git a/src/virtualship/make_realistic/problems/scenarios.py b/src/virtualship/make_realistic/problems/scenarios.py new file mode 100644 index 000000000..a7bc6a840 --- /dev/null +++ b/src/virtualship/make_realistic/problems/scenarios.py @@ -0,0 +1,341 @@ +from __future__ import annotations + +import abc +from dataclasses import dataclass +from datetime import timedelta +from typing import TYPE_CHECKING + +from virtualship.instruments.types import InstrumentType + +if TYPE_CHECKING: + from virtualship.models import Waypoint + + +# ===================================================== +# SECTION: Base Classes +# ===================================================== + + +# TODO: pydantic model to ensure correct types? +@dataclass +class GeneralProblem(abc.ABC): + """ + Base class for general problems. + + Problems occur at each waypoint. + """ + + message: str + can_reoccur: bool + base_probability: float # Probability is a function of time - the longer the expedition the more likely something is to go wrong (not a function of waypoints) + delay_duration: timedelta + pre_departure: bool # True if problem occurs before expedition departure, False if during expedition + + @abc.abstractmethod + def is_valid() -> bool: + """Check if the problem can occur based on e.g. waypoint location and/or datetime etc.""" + ... + + +@dataclass +class InstrumentProblem(abc.ABC): + """Base class for instrument-specific problems.""" + + instrument_dataclass: type + message: str + can_reoccur: bool + base_probability: float # Probability is a function of time - the longer the expedition the more likely something is to go wrong (not a function of waypoints) + delay_duration: timedelta + pre_departure: bool # True if problem can occur before expedition departure, False if during expedition + + @abc.abstractmethod + def is_valid() -> bool: + """Check if the problem can occur based on e.g. waypoint location and/or datetime etc.""" + ... + + +# ===================================================== +# SECTION: General Problems +# ===================================================== + + +@dataclass +# @register_general_problem +class FoodDeliveryDelayed: + """Problem: Scheduled food delivery is delayed, causing a postponement of departure.""" + + message = ( + "The scheduled food delivery prior to departure has not arrived. Until the supply truck reaches the pier, " + "we cannot leave. Once it arrives, unloading and stowing the provisions in the ship’s cold storage " + "will also take additional time. These combined delays postpone departure by approximately 5 hours." + ) + can_reoccur = False + delay_duration = timedelta(hours=5.0) + base_probability = 0.1 + pre_departure = True + + +@dataclass +# @register_general_problem +class VenomousCentipedeOnboard(GeneralProblem): + """Problem: Venomous centipede discovered onboard in tropical waters.""" + + # TODO: this needs logic added to the is_valid() method to check if waypoint is in tropical waters + + message = ( + "A venomous centipede is discovered onboard while operating in tropical waters. " + "One crew member becomes ill after contact with the creature and receives medical attention, " + "prompting a full search of the vessel to ensure no further danger. " + "The medical response and search efforts cause an operational delay of about 2 hours." + ) + can_reoccur = False + delay_duration = timedelta(hours=2.0) + base_probability = 0.05 + pre_departure = False + + def is_valid(self, waypoint: Waypoint) -> bool: + """Check if the waypoint is in tropical waters.""" + lat_limit = 23.5 # [degrees] + return abs(waypoint.latitude) <= lat_limit + + +# @register_general_problem +class CaptainSafetyDrill(GeneralProblem): + """Problem: Sudden initiation of a mandatory safety drill.""" + + message = ( + "A miscommunication with the ship’s captain results in the sudden initiation of a mandatory safety drill. " + "The emergency vessel must be lowered and tested while the ship remains stationary, pausing all scientific " + "operations for the duration of the exercise. The drill introduces a delay of approximately 2 hours." + ) + can_reoccur = False + delay_duration = timedelta(hours=2.0) + base_probability = 0.1 + pre_departure = False + + +@dataclass +class FuelDeliveryIssue: + message = ( + "The fuel tanker expected to deliver fuel has not arrived. Port authorities are unable to provide " + "a clear estimate for when the delivery might occur. You may choose to [w]ait for the tanker or [g]et a " + "harbor pilot to guide the vessel to an available bunker dock instead. This decision may need to be " + "revisited periodically depending on circumstances." + ) + can_reoccur: bool = False + delay_duration: float = 0.0 # dynamic delays based on repeated choices + + +@dataclass +class EngineOverheating: + message = ( + "One of the main engines has overheated. To prevent further damage, the engineering team orders a reduction " + "in vessel speed until the engine can be inspected and repaired in port. The ship will now operate at a " + "reduced cruising speed of 8.5 knots for the remainder of the transit." + ) + can_reoccur: bool = False + delay_duration: None = None # speed reduction affects ETA instead of fixed delay + ship_speed_knots: float = 8.5 + + +# @register_general_problem +class MarineMammalInDeploymentArea(GeneralProblem): + """Problem: Marine mammals observed in deployment area, causing delay.""" + + message = ( + "A pod of dolphins is observed swimming directly beneath the planned deployment area. " + "To avoid risk to wildlife and comply with environmental protocols, all in-water operations " + "must pause until the animals move away from the vicinity. This results in a delay of about 30 minutes." + ) + can_reoccur: bool = True + delay_duration: float = 0.5 + base_probability: float = 0.1 + + +# @register_general_problem +class BallastPumpFailure(GeneralProblem): + """Problem: Ballast pump failure during ballasting operations.""" + + message = ( + "One of the ship’s ballast pumps suddenly stops responding during routine ballasting operations. " + "Without the pump, the vessel cannot safely adjust trim or compensate for equipment movements on deck. " + "Engineering isolates the faulty pump and performs a rapid inspection. Temporary repairs allow limited " + "functionality, but the interruption causes a delay of approximately 1 hour." + ) + can_reoccur: bool = True + delay_duration: float = 1.0 + base_probability: float = 0.1 + + +# @register_general_problem +class ThrusterConverterFault(GeneralProblem): + """Problem: Bow thruster's power converter fault during station-keeping.""" + + message = ( + "The bow thruster's power converter reports a fault during station-keeping operations. " + "Dynamic positioning becomes less stable, forcing a temporary suspension of high-precision sampling. " + "Engineers troubleshoot the converter and perform a reset, resulting in a delay of around 1 hour." + ) + can_reoccur: bool = False + delay_duration: float = 1.0 + base_probability: float = 0.1 + + +# @register_general_problem +class AFrameHydraulicLeak(GeneralProblem): + """Problem: Hydraulic fluid leak from A-frame actuator.""" + + message = ( + "A crew member notices hydraulic fluid leaking from the A-frame actuator during equipment checks. " + "The leak must be isolated immediately to prevent environmental contamination or mechanical failure. " + "Engineering replaces a faulty hose and repressurizes the system. This repair causes a delay of about 2 hours." + ) + can_reoccur: bool = True + delay_duration: float = 2.0 + base_probability: float = 0.1 + + +# @register_general_problem +class CoolingWaterIntakeBlocked(GeneralProblem): + """Problem: Main engine's cooling water intake blocked.""" + + message = ( + "The main engine's cooling water intake alarms indicate reduced flow, likely caused by marine debris " + "or biological fouling. The vessel must temporarily slow down while engineering clears the obstruction " + "and flushes the intake. This results in a delay of approximately 1 hour." + ) + can_reoccur: bool = True + delay_duration: float = 1.0 + base_probability: float = 0.1 + + +# ===================================================== +# SECTION: Instrument-specific Problems +# ===================================================== + + +# @register_instrument_problem(InstrumentType.CTD) +class CTDCableJammed(InstrumentProblem): + """Problem: CTD cable jammed in winch drum, requiring replacement.""" + + message = ( + "During preparation for the next CTD cast, the CTD cable becomes jammed in the winch drum. " + "Attempts to free it are unsuccessful, and the crew determines that the entire cable must be " + "replaced before deployment can continue. This repair is time-consuming and results in a delay " + "of approximately 3 hours." + ) + can_reoccur = True + delay_duration = timedelta(hours=3.0) + base_probability = 0.1 + instrument_type = InstrumentType.CTD + + +# @register_instrument_problem(InstrumentType.ADCP) +class ADCPMalfunction(InstrumentProblem): + """Problem: ADCP returns invalid data, requiring inspection.""" + + message = ( + "The hull-mounted ADCP begins returning invalid velocity data. Engineering suspects damage to the cable " + "from recent maintenance activities. The ship must hold position while a technician enters the cable " + "compartment to perform an inspection and continuity test. This diagnostic procedure results in a delay " + "of around 1 hour." + ) + can_reoccur = True + delay_duration = timedelta(hours=1.0) + base_probability = 0.1 + instrument_type = InstrumentType.ADCP + + +# @register_instrument_problem(InstrumentType.CTD) +class CTDTemperatureSensorFailure(InstrumentProblem): + """Problem: CTD temperature sensor failure, requiring replacement.""" + + message = ( + "The primary temperature sensor on the CTD begins returning inconsistent readings. " + "Troubleshooting confirms that the sensor has malfunctioned. A spare unit can be installed, " + "but integrating and verifying the replacement will pause operations. " + "This procedure leads to an estimated delay of around 2 hours." + ) + can_reoccur: bool = True + delay_duration: float = 2.0 + base_probability: float = 0.1 + instrument_type = InstrumentType.CTD + + +# @register_instrument_problem(InstrumentType.CTD) +class CTDSalinitySensorFailureWithCalibration(InstrumentProblem): + """Problem: CTD salinity sensor failure, requiring replacement and calibration.""" + + message = ( + "The CTD’s primary salinity sensor fails and must be replaced with a backup. After installation, " + "a mandatory calibration cast to a minimum depth of 1000 meters is required to verify sensor accuracy. " + "Both the replacement and calibration activities result in a total delay of roughly 4 hours." + ) + can_reoccur: bool = True + delay_duration: float = 4.0 + base_probability: float = 0.1 + instrument_type = InstrumentType.CTD + + +# @register_instrument_problem(InstrumentType.CTD) +class WinchHydraulicPressureDrop(InstrumentProblem): + """Problem: CTD winch hydraulic pressure drop, requiring repair.""" + + message = ( + "The CTD winch begins to lose hydraulic pressure during routine checks prior to deployment. " + "The engineering crew must stop operations to diagnose the hydraulic pump and replenish or repair " + "the system. Until pressure is restored to operational levels, the winch cannot safely be used. " + "This results in an estimated delay of 1.5 hours." + ) + can_reoccur: bool = True + delay_duration: float = 1.5 + base_probability: float = 0.1 + instrument_type = InstrumentType.CTD + + +# @register_instrument_problem(InstrumentType.CTD) +class RosetteTriggerFailure(InstrumentProblem): + """Problem: CTD rosette trigger failure, requiring inspection.""" + + message = ( + "During a CTD cast, the rosette's bottle-triggering mechanism fails to actuate. " + "No discrete water samples can be collected during this cast. The rosette must be brought back " + "on deck for inspection and manual testing of the trigger system. This results in an operational " + "delay of approximately 2.5 hours." + ) + can_reoccur: bool = True + delay_duration: float = 2.5 + base_probability: float = 0.1 + instrument_type = InstrumentType.CTD + + +# @register_instrument_problem(InstrumentType.DRIFTER) +class DrifterSatelliteConnectionDelay(InstrumentProblem): + """Problem: Drifter fails to establish satellite connection before deployment.""" + + message = ( + "The drifter scheduled for deployment fails to establish a satellite connection during " + "pre-launch checks. To improve signal acquisition, the float must be moved to a higher location on deck " + "with fewer obstructions. The team waits for the satellite fix to come through, resulting in a delay " + "of approximately 2 hours." + ) + can_reoccur: bool = True + delay_duration: float = 2.0 + base_probability: float = 0.1 + instrument_type = InstrumentType.DRIFTER + + +# @register_instrument_problem(InstrumentType.ARGO_FLOAT) +class ArgoSatelliteConnectionDelay(InstrumentProblem): + """Problem: Argo float fails to establish satellite connection before deployment.""" + + message = ( + "The Argo float scheduled for deployment fails to establish a satellite connection during " + "pre-launch checks. To improve signal acquisition, the float must be moved to a higher location on deck " + "with fewer obstructions. The team waits for the satellite fix to come through, resulting in a delay " + "of approximately 2 hours." + ) + can_reoccur: bool = True + delay_duration: float = 2.0 + base_probability: float = 0.1 + instrument_type = InstrumentType.ARGO_FLOAT diff --git a/src/virtualship/make_realistic/problems/simulator.py b/src/virtualship/make_realistic/problems/simulator.py new file mode 100644 index 000000000..a74b4604e --- /dev/null +++ b/src/virtualship/make_realistic/problems/simulator.py @@ -0,0 +1,302 @@ +from __future__ import annotations + +import hashlib +import os +import sys +import time +from pathlib import Path +from typing import TYPE_CHECKING + +import numpy as np +from yaspin import yaspin + +from virtualship.instruments.types import InstrumentType +from virtualship.make_realistic.problems.scenarios import ( + CaptainSafetyDrill, + CTDCableJammed, +) +from virtualship.models.checkpoint import Checkpoint +from virtualship.models.expedition import Schedule +from virtualship.utils import ( + CHECKPOINT, + EXPEDITION, + PROBLEMS_ENCOUNTERED_DIR, + SCHEDULE_ORIGINAL, + _save_checkpoint, +) + +if TYPE_CHECKING: + from virtualship.make_realistic.problems.scenarios import ( + GeneralProblem, + InstrumentProblem, + ) +import json +import random + +LOG_MESSAGING = { + "pre_departure": "Hang on! There could be a pre-departure problem in-port...", + "during_expedition": "Oh no, a problem has occurred during the expedition, at waypoint {waypoint_i}...!", + "simulation_paused": "Please update your schedule (`virtualship plan` or directly in {expedition_yaml}) to account for the delay at waypoint {waypoint_i} and continue the expedition by executing the `virtualship run` command again.\nCheckpoint has been saved to {checkpoint_path}.\n", + "problem_avoided": "Phew! You had enough contingency time scheduled to avoid delays from this problem. The expedition can carry on shortly...\n", + "pre_departure_delay": "This problem will cause a delay of {delay_duration} hours to the whole expedition schedule. Please account for this for all waypoints in your schedule (`virtualship plan` or directly in {expedition_yaml}), then continue the expedition by executing the `virtualship run` command again.\n", +} + + +class ProblemSimulator: + """Handle problem simulation during expedition.""" + + def __init__(self, schedule: Schedule, prob_level: int, expedition_dir: str | Path): + """Initialise ProblemSimulator with a schedule and probability level.""" + self.schedule = schedule + self.prob_level = prob_level + self.expedition_dir = Path(expedition_dir) + + def select_problems( + self, + ) -> dict[str, list[GeneralProblem | InstrumentProblem]] | None: + """Propagate both general and instrument problems.""" + # TODO: whether a problem can reoccur or not needs to be handled here too! + probability = self._calc_prob() + probability = 1.0 # TODO: temporary override for testing!! + if probability > 0.0: + problems = {} + problems["general"] = self._general_problem_select(probability) + problems["instrument"] = self._instrument_problem_select(probability) + return problems + else: + return None + + def execute( + self, + problems: dict[str, list[GeneralProblem | InstrumentProblem]], + instrument_type: InstrumentType | None = None, + log_delay: float = 7.0, + ): + """ + Execute the selected problems, returning messaging and delay times. + + N.B. the problem_waypoint_i is different to the failed_waypoint_i defined in the Checkpoint class; the failed_waypoint_i is the waypoint index after the problem_waypoint_i where the problem occurred, as this is when scheduling issues would be encountered. + """ + # TODO: integration with which zarr files have been written so far? + # TODO: logic to determine whether user has made the necessary changes to the schedule to account for the problem's delay_duration when next running the simulation... (does this come in here or _run?) + # TODO: logic for whether the user has already scheduled in enough contingency time to account for the problem's delay_duration, and they get a well done message if so + # TODO: need logic for if the problem can reoccur or not / and or that it has already occurred and has been addressed + + # TODO: re: prob levels: + # 0 = no problems + # 1 = only one problem in expedition (either pre-departure or during expedition, general or instrument) [and set this to DEFAULT prob level] + # 2 = multiple problems can occur (general and instrument), but only one pre-departure problem allowed + + # TODO: what to do about fact that students can avoid all problems by just scheduling in enough contingency time?? + # this should probably be a learning point though, so maybe it's fine... + #! though could then ensure that if they pass because of contingency time, they definitely get a pre-depature problem...? + # this would all probably have to be a bit asynchronous, which might make things more complicated... + + #! TODO: logic as well for case where problem can reoccur but it can only reoccur at a waypoint different to the one it has already occurred at + + # TODO: N.B. there is not logic currently controlling how many problems can occur in total during an expedition; at the moment it can happen every time the expedition is run if it's a different waypoint / problem combination + + general_problems = problems["general"] + instrument_problems = problems["instrument"] + + # allow only one pre-departure problem to occur + pre_departure_problems = [p for p in general_problems if p.pre_departure] + if len(pre_departure_problems) > 1: + to_keep = random.choice(pre_departure_problems) + general_problems = [ + p for p in general_problems if not p.pre_departure or p is to_keep + ] + # ensure any pre-departure problem is first in list + general_problems.sort(key=lambda x: x.pre_departure, reverse=True) + + # TODO: make the log output stand out more visually + # general problems + for gproblem in general_problems: + # determine problem waypoint index (random if during expedition) + problem_waypoint_i = ( + None + if gproblem.pre_departure + else np.random.randint( + 0, len(self.schedule.waypoints) - 1 + ) # last waypoint excluded (would not impact any future scheduling) + ) + + # mark problem by unique hash and log to json, use to assess whether problem has already occurred + gproblem_hash = self._make_hash( + gproblem.message + str(problem_waypoint_i), 8 + ) + hash_path = Path( + self.expedition_dir + / f"{PROBLEMS_ENCOUNTERED_DIR}/problem_{gproblem_hash}.json" + ) + if hash_path.exists(): + continue # problem * waypoint combination has already occurred; don't repeat + else: + self._hash_to_json( + gproblem, gproblem_hash, problem_waypoint_i, hash_path + ) + + if gproblem.pre_departure: + alert_msg = LOG_MESSAGING["pre_departure"] + + else: + alert_msg = LOG_MESSAGING["during_expedition"].format( + waypoint_i=int(problem_waypoint_i) + 1 + ) + + # log problem occurrence, save to checkpoint, and pause simulation + self._log_problem(gproblem, problem_waypoint_i, alert_msg, log_delay) + + # instrument problems + for i, problem in enumerate(problems["instrument"]): + pass # TODO: implement!! + # TODO: similar logic to above for instrument-specific problems... or combine? + + def _propagate_general_problems(self): + """Propagate general problems based on probability.""" + probability = self._calc_general_prob(self.schedule, prob_level=self.prob_level) + return self._general_problem_select(probability) + + def _propagate_instrument_problems(self): + """Propagate instrument problems based on probability.""" + probability = self._calc_instrument_prob( + self.schedule, prob_level=self.prob_level + ) + return self._instrument_problem_select(probability) + + def _calc_prob(self) -> float: + """ + Calcuates probability of a general problem as function of expedition duration and prob-level. + + TODO: for now, general and instrument-specific problems have the same probability of occurence. Separating this out and allowing their probabilities to be set independently may be useful in future. + """ + if self.prob_level == 0: + return 0.0 + + def _general_problem_select(self, probability) -> list[GeneralProblem]: + """Select which problems. Higher probability (tied to expedition duration) means more problems are likely to occur.""" + return [ + CaptainSafetyDrill, + ] # TODO: temporary placeholder!! + + def _instrument_problem_select(self, probability) -> list[InstrumentProblem]: + """Select which problems. Higher probability (tied to expedition duration) means more problems are likely to occur.""" + # set: waypoint instruments vs. list of instrument-specific problems (automated registry) + # will deterimne which instrument-specific problems are possible at this waypoint + + # wp_instruments = self.schedule.waypoints.instruments + + return [CTDCableJammed] + + def _log_problem( + self, + problem: GeneralProblem | InstrumentProblem, + problem_waypoint_i: int | None, + alert_msg: str, + log_delay: float, + ): + """Log problem occurrence with spinner and delay, save to checkpoint, write hash.""" + time.sleep(3.0) # brief pause before spinner + with yaspin(text=alert_msg) as spinner: + time.sleep(log_delay) + spinner.ok("💥 ") + + print("\nPROBLEM ENCOUNTERED: " + problem.message + "\n") + + if problem_waypoint_i is None: # pre-departure problem + print( + "\nRESULT: " + + LOG_MESSAGING["pre_departure_delay"].format( + delay_duration=problem.delay_duration.total_seconds() / 3600.0, + expedition_yaml=EXPEDITION, + ) + ) + + else: # problem occurring during expedition + result_msg = "\nRESULT: " + LOG_MESSAGING["simulation_paused"].format( + waypoint_i=int(problem_waypoint_i) + 1, + expedition_yaml=EXPEDITION, + checkpoint_path=self.expedition_dir.joinpath(CHECKPOINT), + ) + + # handle first waypoint separately (no previous waypoint to provide contingency time, or rather the previous waypoint ends up being the -1th waypoint which is non-sensical) + if problem_waypoint_i == 0: + print(result_msg) + + # all other waypoints + else: + # check if enough contingency time has been scheduled to avoid delay affecting future waypoints + with yaspin(text="Assessing impact on expedition schedule..."): + time.sleep(5.0) + problem_waypoint_time = self.schedule.waypoints[problem_waypoint_i].time + next_waypoint_time = self.schedule.waypoints[ + problem_waypoint_i + 1 + ].time + time_diff = ( + next_waypoint_time - problem_waypoint_time + ).total_seconds() / 3600.0 # [hours] + if time_diff >= problem.delay_duration.total_seconds() / 3600.0: + print(LOG_MESSAGING["problem_avoided"]) + # give users time to read message before simulation continues + with yaspin(): + time.sleep(7.0) + return + + else: + print( + f"\nNot enough contingency time scheduled to mitigate delay of {problem.delay_duration.total_seconds() / 3600.0} hours occuring at waypoint {problem_waypoint_i + 1} (future waypoints would be reached too late).\n" + ) + print(result_msg) + + # save checkpoint + checkpoint = self._make_checkpoint( + failed_waypoint_i=problem_waypoint_i + 1 + ) # failed waypoint index then becomes the one after the one where the problem occurred; this is when scheduling issues would be run into + _save_checkpoint(checkpoint, self.expedition_dir) + + # cache original schedule for reference and/or restoring later if needed (checkpoint can be overwritten if multiple problems occur so is not a persistent record of original schedule) + schedule_original_path = ( + self.expedition_dir / PROBLEMS_ENCOUNTERED_DIR / SCHEDULE_ORIGINAL + ) + if os.path.exists(schedule_original_path) is False: + self._cache_original_schedule(self.schedule, schedule_original_path) + + # pause simulation + sys.exit(0) + + def _make_checkpoint(self, failed_waypoint_i: int | None = None) -> Checkpoint: + """Make checkpoint, also handling pre-departure.""" + fpi = None if failed_waypoint_i is None else failed_waypoint_i + return Checkpoint(past_schedule=self.schedule, failed_waypoint_i=fpi) + + def _make_hash(self, s: str, length: int) -> str: + """Make unique hash for problem occurrence.""" + assert length % 2 == 0, "Length must be even." + half_length = length // 2 + return hashlib.shake_128(s.encode("utf-8")).hexdigest(half_length) + + def _hash_to_json( + self, + problem: InstrumentProblem | GeneralProblem, + problem_hash: str, + failed_waypoint_i: int | None, + hash_path: Path, + ) -> dict: + """Convert problem details + hash to json.""" + os.makedirs(self.expedition_dir / PROBLEMS_ENCOUNTERED_DIR, exist_ok=True) + hash_data = { + "problem_hash": problem_hash, + "message": problem.message, + "failed_waypoint_i": failed_waypoint_i, + "delay_duration_hours": problem.delay_duration.total_seconds() / 3600.0, + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), + "resolved": False, + } + with open(hash_path, "w") as f: + json.dump(hash_data, f, indent=4) + + def _cache_original_schedule(self, schedule: Schedule, path: Path | str): + """Cache original schedule to file for reference, as a checkpoint object.""" + schedule_original = Checkpoint(past_schedule=schedule) + schedule_original.to_yaml(path) + print(f"\nOriginal schedule cached to {path}.\n") diff --git a/src/virtualship/models/__init__.py b/src/virtualship/models/__init__.py index d61c17194..7a106ba60 100644 --- a/src/virtualship/models/__init__.py +++ b/src/virtualship/models/__init__.py @@ -1,5 +1,6 @@ """Pydantic models and data classes used to configure virtualship (i.e., in the configuration files or settings).""" +from .checkpoint import Checkpoint from .expedition import ( ADCPConfig, ArgoFloatConfig, @@ -34,4 +35,5 @@ "Spacetime", "Expedition", "InstrumentsConfig", + "Checkpoint", ] diff --git a/src/virtualship/models/checkpoint.py b/src/virtualship/models/checkpoint.py index 98fe1ae0a..cbea7a5b6 100644 --- a/src/virtualship/models/checkpoint.py +++ b/src/virtualship/models/checkpoint.py @@ -2,6 +2,8 @@ from __future__ import annotations +import json +from datetime import timedelta from pathlib import Path import pydantic @@ -9,7 +11,8 @@ from virtualship.errors import CheckpointError from virtualship.instruments.types import InstrumentType -from virtualship.models import Schedule +from virtualship.models.expedition import Schedule +from virtualship.utils import EXPEDITION, PROBLEMS_ENCOUNTERED_DIR class _YamlDumper(yaml.SafeDumper): @@ -29,6 +32,7 @@ class Checkpoint(pydantic.BaseModel): """ past_schedule: Schedule + failed_waypoint_i: int | None = None def to_yaml(self, file_path: str | Path) -> None: """ @@ -51,24 +55,75 @@ def from_yaml(cls, file_path: str | Path) -> Checkpoint: data = yaml.safe_load(file) return Checkpoint(**data) - def verify(self, schedule: Schedule) -> None: + def verify(self, schedule: Schedule, expedition_dir: Path) -> None: """ - Verify that the given schedule matches the checkpoint's past schedule. + Verify that the given schedule matches the checkpoint's past schedule , and/or that any problem has been resolved. - This method checks if the waypoints in the given schedule match the waypoints - in the checkpoint's past schedule up to the length of the past schedule. - If there's a mismatch, it raises a CheckpointError. - - :param schedule: The schedule to verify against the checkpoint. - :type schedule: Schedule - :raises CheckpointError: If the past waypoints in the given schedule - have been changed compared to the checkpoint. - :return: None + Addresses changes made by the user in response to both i) scheduling issues arising for not enough time for the ship to travel between waypoints, and ii) problems encountered during simulation. """ - if ( - not schedule.waypoints[: len(self.past_schedule.waypoints)] - == self.past_schedule.waypoints + # 1) check that past waypoints have not been changed, unless is a pre-departure problem + if self.failed_waypoint_i is None: + pass + elif ( + # TODO: double check this still works as intended for the user defined schedule with not enough time between waypoints case + not schedule.waypoints[: int(self.failed_waypoint_i)] + == self.past_schedule.waypoints[: int(self.failed_waypoint_i)] ): raise CheckpointError( - "Past waypoints in schedule have been changed! Restore past schedule and only change future waypoints." + f"Past waypoints in schedule have been changed! Restore past schedule and only change future waypoints (waypoint {int(self.failed_waypoint_i) + 1} onwards)." + ) + + # 2) check that problems have been resolved in the new schedule + hash_fpaths = [ + str(path.resolve()) + for path in Path(expedition_dir, PROBLEMS_ENCOUNTERED_DIR).glob( + "problem_*.json" ) + ] + if len(hash_fpaths) > 0: + for file in hash_fpaths: + with open(file) as f: + problem = json.load(f) + if problem["resolved"]: + continue + elif not problem["resolved"]: + # check if delay has been accounted for in the schedule + delay_duration = timedelta( + hours=float(problem["delay_duration_hours"]) + ) # delay associated with the problem + waypoint_range = ( + range(len(self.past_schedule.waypoints)) + if self.failed_waypoint_i is None + else range( + int(self.failed_waypoint_i), len(schedule.waypoints) + ) + ) + time_deltas = [ + schedule.waypoints[i].time + - self.past_schedule.waypoints[i].time + for i in waypoint_range + ] # difference in time between the two schedules from the failed waypoint onwards + + if all(td >= delay_duration for td in time_deltas): + print( + "\n\n🎉 Previous problem has been resolved in the schedule.\n" + ) + + # save back to json file changing the resolved status to True + problem["resolved"] = True + with open(file, "w") as f_out: + json.dump(problem, f_out, indent=4) + + else: + affected_waypoints = ( + "all waypoints" + if self.failed_waypoint_i is None + else f"waypoint {int(self.failed_waypoint_i) + 1} onwards" + ) + raise CheckpointError( + f"The problem encountered in previous simulation has not been resolved in the schedule! Please adjust the schedule to account for delays caused by the problem (by using `virtualship plan` or directly editing the {EXPEDITION} file).\n" + f"The problem was associated with a delay duration of {problem['delay_duration_hours']} hours affecting {affected_waypoints}.", + ) + + # only handle the first unresolved problem found; others will be handled in subsequent runs but are not yet known to the user + break diff --git a/src/virtualship/utils.py b/src/virtualship/utils.py index 2879855e4..3894e384b 100644 --- a/src/virtualship/utils.py +++ b/src/virtualship/utils.py @@ -18,9 +18,11 @@ from virtualship.errors import CopernicusCatalogueError if TYPE_CHECKING: - from virtualship.expedition.simulate_schedule import ScheduleOk + from virtualship.expedition.simulate_schedule import ( + ScheduleOk, + ) from virtualship.models import Expedition - + from virtualship.models.checkpoint import Checkpoint import pandas as pd import yaml @@ -29,6 +31,8 @@ EXPEDITION = "expedition.yaml" CHECKPOINT = "checkpoint.yaml" +SCHEDULE_ORIGINAL = "schedule_original.yaml" +PROBLEMS_ENCOUNTERED_DIR = "problems_encountered" def load_static_file(name: str) -> str: @@ -272,6 +276,27 @@ def add_dummy_UV(fieldset: FieldSet): ) from None +# problems inventory registry and registration utilities +INSTRUMENT_PROBLEM_MAP = [] +GENERAL_PROBLEM_REG = [] + + +def register_instrument_problem(instrument_type): + def decorator(cls): + INSTRUMENT_PROBLEM_MAP[instrument_type] = cls + return cls + + return decorator + + +def register_general_problem(): + def decorator(cls): + GENERAL_PROBLEM_REG.append(cls) + return cls + + return decorator + + # Copernicus Marine product IDs PRODUCT_IDS = { @@ -552,3 +577,8 @@ def _get_waypoint_latlons(waypoints): strict=True, ) return wp_lats, wp_lons + + +def _save_checkpoint(checkpoint: Checkpoint, expedition_dir: Path) -> None: + file_path = expedition_dir.joinpath(CHECKPOINT) + checkpoint.to_yaml(file_path)