Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -7599,6 +7599,113 @@ def pattern2hex(pattern: List[bool]) -> str:
cb=blink_pattern_hex,
)

async def verify_and_wait_for_carriers(
self,
check_interval: float = 1.0,
):
"""Verify that carriers have been loaded at expected rail positions.

This function checks if carriers are physically present on the deck at the specified
rail positions using the deck's presence sensors. If any carriers are missing, it will:
1. Prompt the user to load the missing carriers
2. Flash LEDs at the missing positions using set_loading_indicators
3. Continue checking until all carriers are detected

Args:
check_interval: Interval in seconds between presence checks (default: 1.0)

Raises:
ValueError: If no carriers are found on the deck.
"""
# Extract carriers from deck children with start and end rail positions
track_width = 22.5
carrier_rails = [] # List of (start_rail, end_rail) tuples

for child in self.deck.children:
if isinstance(child, Carrier):
# Get x coordinate relative to deck
carrier_x = child.get_location_wrt(self.deck).x
carrier_start_rail = int((carrier_x - 100.0) / track_width) + 1
carrier_end_rail = int((carrier_x - 100.0 + child.get_absolute_size_x()) / track_width)

# Verify rails are valid
carrier_start_rail = max(1, min(carrier_start_rail, 54))
if 1 <= carrier_end_rail <= 54:
carrier_rails.append((carrier_start_rail, carrier_end_rail))

if not carrier_rails:
raise ValueError("No carriers found on deck. Assign carriers to the deck.")

# Extract end rails for comparison with detected rails
# The presence detection reports the end rail position
expected_end_rails = [end_rail for _, end_rail in carrier_rails]

# Check initial presence
detected_rails = set(await self.request_presence_of_carriers_on_deck())
missing_end_rails = sorted(set(expected_end_rails) - detected_rails)

if not missing_end_rails:
logger.info(f"All carriers detected at end rail positions: {expected_end_rails}")
# Turn off all indicators
await self.set_loading_indicators(
bit_pattern=[False] * 54,
blink_pattern=[False] * 54,
)
print(f"\n✓ All carriers successfully detected at end rail positions: {expected_end_rails}\n")
return

# Prompt user about missing carriers
print(
f"\n{'='*60}\n"
f"CARRIER LOADING REQUIRED\n"
f"{'='*60}\n"
f"Expected carriers at end rail positions: {expected_end_rails}\n"
f"Detected carriers at rail positions: {sorted(detected_rails)}\n"
f"Missing carriers at end rail positions: {missing_end_rails}\n"
f"{'='*60}\n"
f"Please load the missing carriers. LEDs will flash at the carrier positions.\n"
f"The system will automatically detect when all carriers are loaded.\n"
f"{'='*60}\n"
)

# Flash LEDs until all carriers are detected
while missing_end_rails:
# Create bit pattern for missing carriers
# Flash all LEDs from start_rail to end_rail (inclusive) for each missing carrier
bit_pattern = [False] * 54
blink_pattern = [False] * 54

# For each missing carrier (identified by missing end rail), flash all its rails
for missing_end_rail in missing_end_rails:
# Find the carrier with this end rail
for start_rail, end_rail in carrier_rails:
if end_rail == missing_end_rail:
# Flash all LEDs from start_rail to end_rail (inclusive)
for rail in range(start_rail, end_rail + 1):
if 1 <= rail <= 54:
indicator_index = rail - 1 # Convert rail (1-54) to index (0-53)
bit_pattern[indicator_index] = True
blink_pattern[indicator_index] = True
break

# Set loading indicators
await self.set_loading_indicators(bit_pattern[::-1], blink_pattern[::-1])

# Wait before checking again
await asyncio.sleep(check_interval)

# Check for presence again
detected_rails = set(await self.request_presence_of_carriers_on_deck())
missing_end_rails = sorted(set(expected_end_rails) - detected_rails)

# All carriers detected, turn off all indicators
logger.info(f"All carriers successfully detected at end rail positions: {expected_end_rails}")
await self.set_loading_indicators(
bit_pattern=[False] * 54,
blink_pattern=[False] * 54,
)
print(f"\n✓ All carriers successfully loaded and detected!\n")

async def unload_carrier(
self,
carrier: Carrier,
Expand Down
Loading