diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 987fb9b53da..9ff7f2e474a 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -7599,6 +7599,113 @@ def pattern2hex(pattern: List[bool]) -> str: cb=blink_pattern_hex, ) + async def verify_and_wait_for_carriers( + self, + check_interval: float = 1.0, + ): + """Verify that carriers have been loaded at expected rail positions. + + This function checks if carriers are physically present on the deck at the specified + rail positions using the deck's presence sensors. If any carriers are missing, it will: + 1. Prompt the user to load the missing carriers + 2. Flash LEDs at the missing positions using set_loading_indicators + 3. Continue checking until all carriers are detected + + Args: + check_interval: Interval in seconds between presence checks (default: 1.0) + + Raises: + ValueError: If no carriers are found on the deck. + """ + # Extract carriers from deck children with start and end rail positions + track_width = 22.5 + carrier_rails = [] # List of (start_rail, end_rail) tuples + + for child in self.deck.children: + if isinstance(child, Carrier): + # Get x coordinate relative to deck + carrier_x = child.get_location_wrt(self.deck).x + carrier_start_rail = int((carrier_x - 100.0) / track_width) + 1 + carrier_end_rail = int((carrier_x - 100.0 + child.get_absolute_size_x()) / track_width) + + # Verify rails are valid + carrier_start_rail = max(1, min(carrier_start_rail, 54)) + if 1 <= carrier_end_rail <= 54: + carrier_rails.append((carrier_start_rail, carrier_end_rail)) + + if not carrier_rails: + raise ValueError("No carriers found on deck. Assign carriers to the deck.") + + # Extract end rails for comparison with detected rails + # The presence detection reports the end rail position + expected_end_rails = [end_rail for _, end_rail in carrier_rails] + + # Check initial presence + detected_rails = set(await self.request_presence_of_carriers_on_deck()) + missing_end_rails = sorted(set(expected_end_rails) - detected_rails) + + if not missing_end_rails: + logger.info(f"All carriers detected at end rail positions: {expected_end_rails}") + # Turn off all indicators + await self.set_loading_indicators( + bit_pattern=[False] * 54, + blink_pattern=[False] * 54, + ) + print(f"\n✓ All carriers successfully detected at end rail positions: {expected_end_rails}\n") + return + + # Prompt user about missing carriers + print( + f"\n{'='*60}\n" + f"CARRIER LOADING REQUIRED\n" + f"{'='*60}\n" + f"Expected carriers at end rail positions: {expected_end_rails}\n" + f"Detected carriers at rail positions: {sorted(detected_rails)}\n" + f"Missing carriers at end rail positions: {missing_end_rails}\n" + f"{'='*60}\n" + f"Please load the missing carriers. LEDs will flash at the carrier positions.\n" + f"The system will automatically detect when all carriers are loaded.\n" + f"{'='*60}\n" + ) + + # Flash LEDs until all carriers are detected + while missing_end_rails: + # Create bit pattern for missing carriers + # Flash all LEDs from start_rail to end_rail (inclusive) for each missing carrier + bit_pattern = [False] * 54 + blink_pattern = [False] * 54 + + # For each missing carrier (identified by missing end rail), flash all its rails + for missing_end_rail in missing_end_rails: + # Find the carrier with this end rail + for start_rail, end_rail in carrier_rails: + if end_rail == missing_end_rail: + # Flash all LEDs from start_rail to end_rail (inclusive) + for rail in range(start_rail, end_rail + 1): + if 1 <= rail <= 54: + indicator_index = rail - 1 # Convert rail (1-54) to index (0-53) + bit_pattern[indicator_index] = True + blink_pattern[indicator_index] = True + break + + # Set loading indicators + await self.set_loading_indicators(bit_pattern[::-1], blink_pattern[::-1]) + + # Wait before checking again + await asyncio.sleep(check_interval) + + # Check for presence again + detected_rails = set(await self.request_presence_of_carriers_on_deck()) + missing_end_rails = sorted(set(expected_end_rails) - detected_rails) + + # All carriers detected, turn off all indicators + logger.info(f"All carriers successfully detected at end rail positions: {expected_end_rails}") + await self.set_loading_indicators( + bit_pattern=[False] * 54, + blink_pattern=[False] * 54, + ) + print(f"\n✓ All carriers successfully loaded and detected!\n") + async def unload_carrier( self, carrier: Carrier,