modlink_core
1from .bus import FrameStream, FrameStreamOverflowError, StreamBus 2from .event_stream import ( 3 BackendEventBroker, 4 EventStream, 5 EventStreamOverflowError, 6 StreamClosedError, 7) 8from .events import ( 9 DriverConnectionLostEvent, 10 DriverExecutorFailedEvent, 11 RecordingFailedEvent, 12 SettingChangedEvent, 13) 14from .logging_setup import configure_host_logging 15from .models import ( 16 DriverSnapshot, 17 ExportJobSnapshot, 18 RecordingSnapshot, 19 RecordingStartSummary, 20 RecordingStopSummary, 21 ReplayMarker, 22 ReplayRecordingSummary, 23 ReplaySegment, 24 ReplaySnapshot, 25) 26from .recording import RecordingBackend 27from .replay import ReplayBackend 28from .runtime import ModLinkEngine 29from .settings import ( 30 SettingsStore, 31) 32 33__all__ = [ 34 "RecordingBackend", 35 "RecordingSnapshot", 36 "RecordingStartSummary", 37 "RecordingStopSummary", 38 "ReplayBackend", 39 "ReplayMarker", 40 "ReplayRecordingSummary", 41 "ReplaySegment", 42 "ReplaySnapshot", 43 "ExportJobSnapshot", 44 "BackendEventBroker", 45 "DriverConnectionLostEvent", 46 "DriverExecutorFailedEvent", 47 "EventStream", 48 "EventStreamOverflowError", 49 "FrameStream", 50 "FrameStreamOverflowError", 51 "configure_host_logging", 52 "DriverSnapshot", 53 "ModLinkEngine", 54 "RecordingFailedEvent", 55 "SettingChangedEvent", 56 "SettingsStore", 57 "StreamClosedError", 58 "StreamBus", 59]
class
RecordingBackend:
46class RecordingBackend: 47 """Threaded recording backend that persists bus frames to disk.""" 48 49 def __init__( 50 self, 51 bus: StreamBus, 52 *, 53 settings: SettingsStore, 54 publish_event: Callable[[BackendEvent], None], 55 parent: object | None = None, 56 ) -> None: 57 self._bus = bus 58 self._settings = settings 59 self._publish_event = publish_event 60 self._parent = parent 61 self._command_queue: queue.Queue[RecordingCommand | object] = queue.Queue() 62 self._shutdown_sentinel = object() 63 self._thread: threading.Thread | None = None 64 self._frame_stream: FrameStream | None = None 65 self._state = "idle" 66 self._started = False 67 self._accepting_commands = False 68 self._worker_exit_error: Exception | None = None 69 self._active_recording: ActiveRecording | None = None 70 self._lock = threading.RLock() 71 72 @property 73 def root_dir(self) -> Path: 74 return resolved_storage_root_dir(self._settings) 75 76 @property 77 def is_started(self) -> bool: 78 thread = self._thread 79 return self._started and thread is not None and thread.is_alive() 80 81 @property 82 def state(self) -> str: 83 return self._state 84 85 @property 86 def is_recording(self) -> bool: 87 return self._state == "recording" 88 89 def snapshot(self) -> RecordingSnapshot: 90 return RecordingSnapshot( 91 state=self._state, 92 is_started=self.is_started, 93 is_recording=self.is_recording, 94 root_dir=str(self.root_dir), 95 ) 96 97 def start(self) -> None: 98 if self.is_started: 99 self._started = True 100 self._accepting_commands = True 101 return 102 logger.info("Starting recording backend") 103 if self._frame_stream is None or self._frame_stream.closed: 104 self._frame_stream = self._open_frame_stream() 105 thread = threading.Thread( 106 target=self._run, 107 name="modlink.recording", 108 daemon=True, 109 ) 110 self._thread = thread 111 self._worker_exit_error = None 112 self._accepting_commands = True 113 self._started = True 114 thread.start() 115 logger.info("Recording backend started") 116 117 def start_recording(self, recording_label: str | None = None) -> Future[RecordingStartSummary]: 118 storage_root_dir = self.root_dir 119 recordings_root_dir = storage_root_dir / "recordings" 120 return self._submit_command( 121 self._start_recording_worker, 122 str(storage_root_dir), 123 str(recordings_root_dir), 124 recording_label, 125 self._bus.descriptors(), 126 ) 127 128 def stop_recording(self) -> Future[RecordingStopSummary]: 129 return self._submit_command(self._stop_recording_worker) 130 131 def add_marker(self, label: str | None = None) -> Future[None]: 132 return self._submit_command(self._add_marker_worker, label) 133 134 def add_segment( 135 self, 136 start_ns: int, 137 end_ns: int, 138 label: str | None = None, 139 ) -> Future[None]: 140 return self._submit_command( 141 self._add_segment_worker, 142 start_ns, 143 end_ns, 144 label, 145 ) 146 147 def shutdown(self, *, timeout_ms: int = 3000) -> None: 148 logger.info("Shutting down recording backend") 149 with self._lock: 150 self._accepting_commands = False 151 thread = self._thread 152 worker_exit_error = self._worker_exit_error 153 154 if thread is None or not thread.is_alive(): 155 self._finalize_shutdown() 156 if worker_exit_error is not None: 157 self._worker_exit_error = None 158 raise worker_exit_error 159 return 160 161 self._command_queue.put(self._shutdown_sentinel) 162 thread.join(max(0, timeout_ms) / 1000) 163 164 if thread.is_alive(): 165 raise TimeoutError(f"recording shutdown timed out after {timeout_ms}ms") 166 167 self._finalize_shutdown() 168 worker_exit_error = self._take_worker_exit_error() 169 if worker_exit_error is not None: 170 raise worker_exit_error 171 logger.info("Recording backend shut down") 172 173 def _run(self) -> None: 174 exit_error: Exception | None = None 175 while True: 176 try: 177 if self._drain_commands(): 178 return 179 180 frame_stream = self._frame_stream 181 if frame_stream is None: 182 time.sleep(0.01) 183 continue 184 185 try: 186 frame = frame_stream.read(timeout=0.05) 187 except queue.Empty: 188 continue 189 except StreamClosedError: 190 if not self._started: 191 return 192 logger.warning("Recording frame stream closed unexpectedly; reopening stream") 193 self._frame_stream = self._open_frame_stream() 194 continue 195 except FrameStreamOverflowError as exc: 196 self._handle_frame_stream_overflow(exc.consumer_name) 197 continue 198 199 if not self.is_recording: 200 continue 201 self._on_frame_worker(frame) 202 except Exception as exc: 203 logger.exception("Recording worker exited with an unexpected error") 204 exit_error = exc 205 break 206 207 with self._lock: 208 self._worker_exit_error = exit_error 209 self._started = False 210 self._accepting_commands = False 211 212 def _drain_commands(self) -> bool: 213 while True: 214 try: 215 item = self._command_queue.get_nowait() 216 except queue.Empty: 217 return False 218 219 if item is self._shutdown_sentinel: 220 try: 221 self._shutdown_worker() 222 finally: 223 self._fail_pending_commands("ACQ_SHUTDOWN") 224 return True 225 226 command, _future = item 227 command() 228 229 def _on_frame_worker(self, frame: object) -> None: 230 if not isinstance(frame, FrameEnvelope): 231 return 232 if self._active_recording is None: 233 return 234 235 try: 236 append_recording_frame( 237 Path(self._active_recording.root_dir), 238 self._active_recording.recording_id, 239 frame, 240 ) 241 self._active_recording.frame_counts_by_stream[frame.stream_id] += 1 242 except Exception: 243 logger.exception("Recording append failed; marking recording as failed") 244 self._fail_recording_worker("write_failed") 245 246 def _start_recording_worker( 247 self, 248 root_dir: str, 249 recordings_dir: str, 250 recording_label: object, 251 recording_descriptors: object, 252 ) -> RecordingStartSummary: 253 if self._active_recording is not None: 254 raise RuntimeError("ACQ_ALREADY_RECORDING") 255 256 if not isinstance(recording_descriptors, dict) or not all( 257 isinstance(value, StreamDescriptor) for value in recording_descriptors.values() 258 ): 259 raise RuntimeError("ACQ_INVALID_DESCRIPTOR_SNAPSHOT") 260 261 started_at_ns = time.time_ns() 262 try: 263 recording_id = create_recording( 264 Path(root_dir), 265 recording_descriptors, 266 recording_label=recording_label or None, 267 ) 268 except Exception as exc: 269 self._active_recording = None 270 raise RuntimeError(f"ACQ_START_FAILED: {type(exc).__name__}: {exc}") from exc 271 272 recording_path = str(Path(recordings_dir) / recording_id) 273 self._active_recording = ActiveRecording( 274 root_dir=root_dir, 275 recording_id=recording_id, 276 recording_path=recording_path, 277 started_at_ns=started_at_ns, 278 frame_counts_by_stream={stream_id: 0 for stream_id in recording_descriptors}, 279 ) 280 self._set_state("recording") 281 logger.info("Started recording %s at %s", recording_id, recording_path) 282 return RecordingStartSummary( 283 recording_id=recording_id, 284 recording_path=recording_path, 285 started_at_ns=started_at_ns, 286 ) 287 288 def _stop_recording_worker(self) -> RecordingStopSummary: 289 return self._stop_recording_worker_with_policy(publish_failure=True) 290 291 def _stop_recording_worker_with_policy( 292 self, 293 *, 294 publish_failure: bool, 295 ) -> RecordingStopSummary: 296 if self._active_recording is None: 297 raise RuntimeError("ACQ_STOP_IGNORED: not recording") 298 299 _ = publish_failure 300 stopped_at_ns = time.time_ns() 301 active_recording = self._active_recording 302 self._active_recording = None 303 self._set_state("idle") 304 logger.info("Stopped recording %s", active_recording.recording_id) 305 return RecordingStopSummary( 306 recording_id=active_recording.recording_id, 307 recording_path=active_recording.recording_path, 308 started_at_ns=active_recording.started_at_ns, 309 stopped_at_ns=stopped_at_ns, 310 status="completed", 311 frame_counts_by_stream=dict(active_recording.frame_counts_by_stream), 312 ) 313 314 def _add_marker_worker(self, label: object) -> None: 315 if self._active_recording is None: 316 raise RuntimeError("ACQ_MARKER_REJECTED: not recording") 317 318 timestamp_ns = time.time_ns() 319 normalized_label = label or None 320 try: 321 add_recording_marker( 322 Path(self._active_recording.root_dir), 323 self._active_recording.recording_id, 324 timestamp_ns=timestamp_ns, 325 label=normalized_label, 326 ) 327 except Exception as exc: 328 raise RuntimeError(f"ACQ_MARKER_FAILED: {type(exc).__name__}: {exc}") from exc 329 330 def _add_segment_worker( 331 self, 332 start_ns: object, 333 end_ns: object, 334 label: object, 335 ) -> None: 336 if self._active_recording is None: 337 raise RuntimeError("ACQ_SEGMENT_REJECTED: not recording") 338 339 try: 340 start_value = int(start_ns) 341 end_value = int(end_ns) 342 except (TypeError, ValueError): 343 raise RuntimeError("ACQ_INVALID_SEGMENT_RANGE") 344 345 if start_value > end_value: 346 raise RuntimeError("ACQ_INVALID_SEGMENT_RANGE: start_ns > end_ns") 347 348 normalized_label = label or None 349 try: 350 add_recording_segment( 351 Path(self._active_recording.root_dir), 352 self._active_recording.recording_id, 353 start_ns=start_value, 354 end_ns=end_value, 355 label=normalized_label, 356 ) 357 except Exception as exc: 358 raise RuntimeError(f"ACQ_SEGMENT_FAILED: {type(exc).__name__}: {exc}") from exc 359 360 def _handle_frame_stream_overflow(self, consumer_name: str) -> None: 361 logger.warning("Recording frame stream overflowed for consumer '%s'", consumer_name) 362 if self._active_recording is not None: 363 self._fail_recording_worker("frame_stream_overflow") 364 self._replace_frame_stream() 365 366 def _replace_frame_stream(self) -> None: 367 previous_stream = self._frame_stream 368 self._frame_stream = self._open_frame_stream() 369 if previous_stream is not None: 370 previous_stream.close() 371 372 def _open_frame_stream(self) -> FrameStream: 373 return self._bus.open_frame_stream( 374 maxsize=256, 375 drop_policy="error", 376 consumer_name=RECORDING_CONSUMER_NAME, 377 ) 378 379 def _shutdown_worker(self) -> None: 380 if self._active_recording is not None: 381 self._stop_recording_worker_with_policy(publish_failure=False) 382 383 def _fail_recording_worker(self, reason: str) -> None: 384 active_recording = self._active_recording 385 if active_recording is None: 386 return 387 388 stopped_at_ns = time.time_ns() 389 self._active_recording = None 390 self._set_state("idle") 391 self._publish_recording_failed( 392 active_recording, 393 stopped_at_ns=stopped_at_ns, 394 reason=reason, 395 ) 396 397 def _publish_recording_failed( 398 self, 399 active_recording: ActiveRecording, 400 *, 401 stopped_at_ns: int, 402 reason: str, 403 ) -> None: 404 self._publish_event( 405 RecordingFailedEvent( 406 recording_id=active_recording.recording_id, 407 recording_path=active_recording.recording_path, 408 frame_counts_by_stream=dict(active_recording.frame_counts_by_stream), 409 reason=reason, 410 ts_ns=stopped_at_ns, 411 ) 412 ) 413 414 def _set_state(self, state: str) -> None: 415 with self._lock: 416 if state == self._state: 417 return 418 self._state = state 419 420 def _submit_command( 421 self, 422 worker_method: Callable[..., object], 423 *args: object, 424 ) -> Future[object]: 425 future: Future[object] = Future() 426 if not self.is_started: 427 future.set_exception(RuntimeError("ACQ_NOT_STARTED")) 428 return future 429 if not self._accepting_commands: 430 future.set_exception(RuntimeError("ACQ_SHUTTING_DOWN")) 431 return future 432 433 def _run_command() -> None: 434 try: 435 result = worker_method(*args) 436 except Exception as exc: 437 if not future.done(): 438 future.set_exception(exc) 439 return 440 if not future.done(): 441 future.set_result(result) 442 443 self._command_queue.put((_run_command, future)) 444 return future 445 446 def _fail_pending_commands(self, message: str) -> None: 447 while True: 448 try: 449 item = self._command_queue.get_nowait() 450 except queue.Empty: 451 return 452 if item is self._shutdown_sentinel: 453 continue 454 _command, future = item 455 if future is None or future.done(): 456 continue 457 future.set_exception(RuntimeError(message)) 458 459 def _finalize_shutdown(self) -> None: 460 self._set_state("idle") 461 self._started = False 462 self._accepting_commands = True 463 self._thread = None 464 if self._frame_stream is not None: 465 self._frame_stream.close() 466 self._frame_stream = None 467 468 def _take_worker_exit_error(self) -> Exception | None: 469 with self._lock: 470 error = self._worker_exit_error 471 self._worker_exit_error = None 472 return error
Threaded recording backend that persists bus frames to disk.
RecordingBackend( bus: StreamBus, *, settings: SettingsStore, publish_event: Callable[[BackendEvent], None], parent: object | None = None)
49 def __init__( 50 self, 51 bus: StreamBus, 52 *, 53 settings: SettingsStore, 54 publish_event: Callable[[BackendEvent], None], 55 parent: object | None = None, 56 ) -> None: 57 self._bus = bus 58 self._settings = settings 59 self._publish_event = publish_event 60 self._parent = parent 61 self._command_queue: queue.Queue[RecordingCommand | object] = queue.Queue() 62 self._shutdown_sentinel = object() 63 self._thread: threading.Thread | None = None 64 self._frame_stream: FrameStream | None = None 65 self._state = "idle" 66 self._started = False 67 self._accepting_commands = False 68 self._worker_exit_error: Exception | None = None 69 self._active_recording: ActiveRecording | None = None 70 self._lock = threading.RLock()
def
start(self) -> None:
97 def start(self) -> None: 98 if self.is_started: 99 self._started = True 100 self._accepting_commands = True 101 return 102 logger.info("Starting recording backend") 103 if self._frame_stream is None or self._frame_stream.closed: 104 self._frame_stream = self._open_frame_stream() 105 thread = threading.Thread( 106 target=self._run, 107 name="modlink.recording", 108 daemon=True, 109 ) 110 self._thread = thread 111 self._worker_exit_error = None 112 self._accepting_commands = True 113 self._started = True 114 thread.start() 115 logger.info("Recording backend started")
def
start_recording( self, recording_label: str | None = None) -> concurrent.futures._base.Future[RecordingStartSummary]:
117 def start_recording(self, recording_label: str | None = None) -> Future[RecordingStartSummary]: 118 storage_root_dir = self.root_dir 119 recordings_root_dir = storage_root_dir / "recordings" 120 return self._submit_command( 121 self._start_recording_worker, 122 str(storage_root_dir), 123 str(recordings_root_dir), 124 recording_label, 125 self._bus.descriptors(), 126 )
def
add_segment( self, start_ns: int, end_ns: int, label: str | None = None) -> concurrent.futures._base.Future[None]:
def
shutdown(self, *, timeout_ms: int = 3000) -> None:
147 def shutdown(self, *, timeout_ms: int = 3000) -> None: 148 logger.info("Shutting down recording backend") 149 with self._lock: 150 self._accepting_commands = False 151 thread = self._thread 152 worker_exit_error = self._worker_exit_error 153 154 if thread is None or not thread.is_alive(): 155 self._finalize_shutdown() 156 if worker_exit_error is not None: 157 self._worker_exit_error = None 158 raise worker_exit_error 159 return 160 161 self._command_queue.put(self._shutdown_sentinel) 162 thread.join(max(0, timeout_ms) / 1000) 163 164 if thread.is_alive(): 165 raise TimeoutError(f"recording shutdown timed out after {timeout_ms}ms") 166 167 self._finalize_shutdown() 168 worker_exit_error = self._take_worker_exit_error() 169 if worker_exit_error is not None: 170 raise worker_exit_error 171 logger.info("Recording backend shut down")
@dataclass(frozen=True, slots=True)
class
RecordingSnapshot:
@dataclass(frozen=True, slots=True)
class
RecordingStartSummary:
@dataclass(frozen=True, slots=True)
class
RecordingStopSummary:
32@dataclass(frozen=True, slots=True) 33class RecordingStopSummary: 34 recording_id: str 35 recording_path: str 36 started_at_ns: int 37 stopped_at_ns: int 38 status: str 39 frame_counts_by_stream: dict[str, int]
class
ReplayBackend:
31class ReplayBackend: 32 def __init__( 33 self, 34 *, 35 settings: SettingsStore, 36 parent: object | None = None, 37 ) -> None: 38 self._settings = settings 39 self._parent = parent 40 self.bus = StreamBus(event_broker=BackendEventBroker(), parent=self) 41 self._export_service = ExportService() 42 self._command_queue: queue.Queue[ReplayCommand | object] = queue.Queue() 43 self._shutdown_sentinel = object() 44 self._thread: threading.Thread | None = None 45 self._state = "idle" 46 self._started = False 47 self._accepting_commands = False 48 self._worker_exit_error: Exception | None = None 49 self._lock = threading.RLock() 50 self._recordings: tuple[ReplayRecordingSummary, ...] = () 51 self._reader: RecordingReader | None = None 52 self._markers: tuple[ReplayMarker, ...] = () 53 self._segments: tuple[ReplaySegment, ...] = () 54 self._position_ns = 0 55 self._speed_multiplier = 1.0 56 self._timeline_index = 0 57 self._play_started_wall_ns = 0 58 self._play_started_position_ns = 0 59 60 @property 61 def is_started(self) -> bool: 62 thread = self._thread 63 return self._started and thread is not None and thread.is_alive() 64 65 def start(self) -> None: 66 if self.is_started: 67 self._started = True 68 self._accepting_commands = True 69 return 70 logger.info("Starting replay backend") 71 self._export_service.start() 72 thread = threading.Thread( 73 target=self._run, 74 name="modlink.replay", 75 daemon=True, 76 ) 77 self._thread = thread 78 self._worker_exit_error = None 79 self._accepting_commands = True 80 self._started = True 81 thread.start() 82 self.refresh_recordings() 83 84 def shutdown(self, *, timeout_ms: int = 3000) -> None: 85 logger.info("Shutting down replay backend") 86 with self._lock: 87 self._accepting_commands = False 88 thread = self._thread 89 worker_exit_error = self._worker_exit_error 90 91 if thread is None or not thread.is_alive(): 92 self._finalize_shutdown() 93 self._export_service.shutdown(timeout_ms=timeout_ms) 94 if worker_exit_error is not None: 95 self._worker_exit_error = None 96 raise worker_exit_error 97 return 98 99 self._command_queue.put(self._shutdown_sentinel) 100 thread.join(max(0, timeout_ms) / 1000) 101 if thread.is_alive(): 102 raise TimeoutError(f"replay shutdown timed out after {timeout_ms}ms") 103 104 self._finalize_shutdown() 105 self._export_service.shutdown(timeout_ms=timeout_ms) 106 worker_exit_error = self._take_worker_exit_error() 107 if worker_exit_error is not None: 108 raise worker_exit_error 109 110 def snapshot(self) -> ReplaySnapshot: 111 reader = self._reader 112 return ReplaySnapshot( 113 state=self._state, 114 is_started=self.is_started, 115 recording_id=None if reader is None else reader.recording_id, 116 recording_path=None if reader is None else str(reader.recording_path), 117 position_ns=int(self._position_ns), 118 duration_ns=0 if reader is None else int(reader.duration_ns), 119 speed_multiplier=float(self._speed_multiplier), 120 ) 121 122 def recordings(self) -> tuple[ReplayRecordingSummary, ...]: 123 return self._recordings 124 125 def markers(self) -> tuple[ReplayMarker, ...]: 126 return self._markers 127 128 def segments(self) -> tuple[ReplaySegment, ...]: 129 return self._segments 130 131 def export_jobs(self) -> tuple[ExportJobSnapshot, ...]: 132 return self._export_service.jobs() 133 134 def refresh_recordings(self) -> Future[tuple[ReplayRecordingSummary, ...]]: 135 return self._submit_command(self._refresh_recordings_worker) 136 137 def open_recording(self, recording_path: str | Path) -> Future[ReplaySnapshot]: 138 return self._submit_command(self._open_recording_worker, recording_path) 139 140 def play(self) -> Future[None]: 141 return self._submit_command(self._play_worker) 142 143 def pause(self) -> Future[None]: 144 return self._submit_command(self._pause_worker) 145 146 def stop(self) -> Future[None]: 147 return self._submit_command(self._stop_worker) 148 149 def set_speed(self, multiplier: float) -> Future[float]: 150 return self._submit_command(self._set_speed_worker, multiplier) 151 152 def start_export(self, format_id: str) -> Future[ExportJobSnapshot]: 153 return self._submit_command(self._start_export_worker, format_id) 154 155 def _run(self) -> None: 156 exit_error: Exception | None = None 157 while True: 158 try: 159 if self._drain_commands(): 160 return 161 if self._state != "playing": 162 time.sleep(0.01) 163 continue 164 self._tick_playback() 165 except Exception as exc: 166 logger.exception("Replay worker exited with an unexpected error") 167 exit_error = exc 168 break 169 170 with self._lock: 171 self._worker_exit_error = exit_error 172 self._started = False 173 self._accepting_commands = False 174 175 def _drain_commands(self) -> bool: 176 while True: 177 try: 178 item = self._command_queue.get_nowait() 179 except queue.Empty: 180 return False 181 182 if item is self._shutdown_sentinel: 183 return True 184 185 command, _future = item 186 command() 187 188 def _refresh_recordings_worker(self) -> tuple[ReplayRecordingSummary, ...]: 189 root_dir = resolved_storage_root_dir(self._settings) 190 summaries: list[ReplayRecordingSummary] = [] 191 for manifest in list_recordings(root_dir): 192 recording_id = manifest.get("recording_id") 193 if not isinstance(recording_id, str) or recording_id == "": 194 continue 195 stream_ids_payload = manifest.get("stream_ids", []) 196 stream_ids = tuple( 197 stream_id for stream_id in stream_ids_payload if isinstance(stream_id, str) 198 ) 199 label = manifest.get("recording_label") 200 summaries.append( 201 ReplayRecordingSummary( 202 recording_id=recording_id, 203 recording_label=label 204 if isinstance(label, str) or label is None 205 else str(label), 206 recording_path=str(root_dir / "recordings" / recording_id), 207 stream_ids=stream_ids, 208 ) 209 ) 210 self._recordings = tuple(summaries) 211 return self._recordings 212 213 def _open_recording_worker(self, recording_path: str | Path) -> ReplaySnapshot: 214 logger.info("Opening replay recording from %s", recording_path) 215 reader = RecordingReader(recording_path) 216 self._reader = reader 217 self._markers = reader.markers() 218 self._segments = reader.segments() 219 self._position_ns = 0 220 self._speed_multiplier = 1.0 221 self._timeline_index = 0 222 self._play_started_wall_ns = 0 223 self._play_started_position_ns = 0 224 self._rebuild_bus(reader) 225 self._set_state("ready") 226 logger.debug( 227 "Replay recording opened: recording_id=%s duration_ns=%s stream_count=%s", 228 reader.recording_id, 229 reader.duration_ns, 230 len(reader.descriptors()), 231 ) 232 return self.snapshot() 233 234 def _play_worker(self) -> None: 235 reader = self._require_reader() 236 if not reader.frames(): 237 raise RuntimeError("REPLAY_EMPTY_RECORDING") 238 if self._state == "playing": 239 return 240 if self._state == "finished": 241 self._position_ns = 0 242 self._timeline_index = 0 243 self._play_started_wall_ns = time.monotonic_ns() 244 self._play_started_position_ns = self._position_ns 245 self._set_state("playing") 246 247 def _pause_worker(self) -> None: 248 if self._state != "playing": 249 raise RuntimeError("REPLAY_NOT_PLAYING") 250 self._sync_playback_position(time.monotonic_ns()) 251 self._set_state("paused") 252 253 def _stop_worker(self) -> None: 254 self._require_reader() 255 self._position_ns = 0 256 self._timeline_index = 0 257 self._play_started_wall_ns = 0 258 self._play_started_position_ns = 0 259 self._set_state("ready") 260 261 def _set_speed_worker(self, multiplier: object) -> float: 262 try: 263 resolved = float(multiplier) 264 except (TypeError, ValueError) as exc: 265 raise RuntimeError("REPLAY_INVALID_SPEED") from exc 266 if resolved not in {1.0, 2.0, 4.0}: 267 raise RuntimeError("REPLAY_INVALID_SPEED") 268 if self._state == "playing": 269 current_time_ns = time.monotonic_ns() 270 self._sync_playback_position(current_time_ns) 271 self._play_started_wall_ns = current_time_ns 272 self._play_started_position_ns = self._position_ns 273 self._speed_multiplier = resolved 274 return self._speed_multiplier 275 276 def _start_export_worker(self, format_id: object) -> ExportJobSnapshot: 277 reader = self._require_reader() 278 if not isinstance(format_id, str) or format_id.strip() == "": 279 raise RuntimeError("REPLAY_EXPORT_FORMAT_UNSUPPORTED") 280 export_root_dir = resolved_export_root_dir(self._settings) 281 return self._export_service.enqueue(reader, format_id, export_root_dir) 282 283 def _tick_playback(self) -> None: 284 current_time_ns = time.monotonic_ns() 285 target_position_ns = self._sync_playback_position(current_time_ns) 286 reader = self._reader 287 if reader is None: 288 return 289 if ( 290 self._timeline_index >= len(reader.frames()) 291 and target_position_ns >= reader.duration_ns 292 ): 293 self._position_ns = reader.duration_ns 294 self._set_state("finished") 295 return 296 time.sleep(0.005) 297 298 def _sync_playback_position(self, current_time_ns: int) -> int: 299 reader = self._reader 300 if reader is None: 301 return self._position_ns 302 target_position_ns = min( 303 reader.duration_ns, 304 int( 305 self._play_started_position_ns 306 + (current_time_ns - self._play_started_wall_ns) * self._speed_multiplier 307 ), 308 ) 309 self._emit_due_frames(target_position_ns) 310 self._position_ns = target_position_ns 311 return target_position_ns 312 313 def _emit_due_frames(self, target_position_ns: int) -> None: 314 reader = self._reader 315 if reader is None: 316 return 317 timeline = reader.frames() 318 while self._timeline_index < len(timeline): 319 ref = timeline[self._timeline_index] 320 if ref.relative_timestamp_ns > target_position_ns: 321 break 322 self.bus.ingest_frame(reader.load_frame(ref)) 323 self._timeline_index += 1 324 325 def _rebuild_bus(self, reader: RecordingReader) -> None: 326 for stream_id in tuple(self.bus.descriptors()): 327 self.bus.remove_descriptor(stream_id) 328 self.bus.add_descriptors(reader.descriptors().values()) 329 330 def _set_state(self, state: str) -> None: 331 with self._lock: 332 if self._state == state: 333 return 334 self._state = state 335 336 def _require_reader(self) -> RecordingReader: 337 if self._reader is None: 338 raise RuntimeError("REPLAY_NOT_OPEN") 339 return self._reader 340 341 def _submit_command( 342 self, 343 worker_method: Callable[..., object], 344 *args: object, 345 ) -> Future[object]: 346 future: Future[object] = Future() 347 if not self.is_started: 348 future.set_exception(RuntimeError("REPLAY_NOT_STARTED")) 349 return future 350 if not self._accepting_commands: 351 future.set_exception(RuntimeError("REPLAY_SHUTTING_DOWN")) 352 return future 353 354 def _run_command() -> None: 355 try: 356 result = worker_method(*args) 357 except Exception as exc: 358 if not future.done(): 359 future.set_exception(exc) 360 return 361 if not future.done(): 362 future.set_result(result) 363 364 self._command_queue.put((_run_command, future)) 365 return future 366 367 def _finalize_shutdown(self) -> None: 368 self._set_state("idle") 369 self._started = False 370 self._accepting_commands = True 371 self._thread = None 372 373 def _take_worker_exit_error(self) -> Exception | None: 374 with self._lock: 375 error = self._worker_exit_error 376 self._worker_exit_error = None 377 return error
ReplayBackend( *, settings: SettingsStore, parent: object | None = None)
32 def __init__( 33 self, 34 *, 35 settings: SettingsStore, 36 parent: object | None = None, 37 ) -> None: 38 self._settings = settings 39 self._parent = parent 40 self.bus = StreamBus(event_broker=BackendEventBroker(), parent=self) 41 self._export_service = ExportService() 42 self._command_queue: queue.Queue[ReplayCommand | object] = queue.Queue() 43 self._shutdown_sentinel = object() 44 self._thread: threading.Thread | None = None 45 self._state = "idle" 46 self._started = False 47 self._accepting_commands = False 48 self._worker_exit_error: Exception | None = None 49 self._lock = threading.RLock() 50 self._recordings: tuple[ReplayRecordingSummary, ...] = () 51 self._reader: RecordingReader | None = None 52 self._markers: tuple[ReplayMarker, ...] = () 53 self._segments: tuple[ReplaySegment, ...] = () 54 self._position_ns = 0 55 self._speed_multiplier = 1.0 56 self._timeline_index = 0 57 self._play_started_wall_ns = 0 58 self._play_started_position_ns = 0
def
start(self) -> None:
65 def start(self) -> None: 66 if self.is_started: 67 self._started = True 68 self._accepting_commands = True 69 return 70 logger.info("Starting replay backend") 71 self._export_service.start() 72 thread = threading.Thread( 73 target=self._run, 74 name="modlink.replay", 75 daemon=True, 76 ) 77 self._thread = thread 78 self._worker_exit_error = None 79 self._accepting_commands = True 80 self._started = True 81 thread.start() 82 self.refresh_recordings()
def
shutdown(self, *, timeout_ms: int = 3000) -> None:
84 def shutdown(self, *, timeout_ms: int = 3000) -> None: 85 logger.info("Shutting down replay backend") 86 with self._lock: 87 self._accepting_commands = False 88 thread = self._thread 89 worker_exit_error = self._worker_exit_error 90 91 if thread is None or not thread.is_alive(): 92 self._finalize_shutdown() 93 self._export_service.shutdown(timeout_ms=timeout_ms) 94 if worker_exit_error is not None: 95 self._worker_exit_error = None 96 raise worker_exit_error 97 return 98 99 self._command_queue.put(self._shutdown_sentinel) 100 thread.join(max(0, timeout_ms) / 1000) 101 if thread.is_alive(): 102 raise TimeoutError(f"replay shutdown timed out after {timeout_ms}ms") 103 104 self._finalize_shutdown() 105 self._export_service.shutdown(timeout_ms=timeout_ms) 106 worker_exit_error = self._take_worker_exit_error() 107 if worker_exit_error is not None: 108 raise worker_exit_error
110 def snapshot(self) -> ReplaySnapshot: 111 reader = self._reader 112 return ReplaySnapshot( 113 state=self._state, 114 is_started=self.is_started, 115 recording_id=None if reader is None else reader.recording_id, 116 recording_path=None if reader is None else str(reader.recording_path), 117 position_ns=int(self._position_ns), 118 duration_ns=0 if reader is None else int(reader.duration_ns), 119 speed_multiplier=float(self._speed_multiplier), 120 )
def
refresh_recordings( self) -> concurrent.futures._base.Future[tuple[ReplayRecordingSummary, ...]]:
def
open_recording( self, recording_path: str | pathlib._local.Path) -> concurrent.futures._base.Future[ReplaySnapshot]:
@dataclass(frozen=True, slots=True)
class
ReplayMarker:
@dataclass(frozen=True, slots=True)
class
ReplayRecordingSummary:
53@dataclass(frozen=True, slots=True) 54class ReplayRecordingSummary: 55 recording_id: str 56 recording_label: str | None 57 recording_path: str 58 stream_ids: tuple[str, ...]
@dataclass(frozen=True, slots=True)
class
ReplaySegment:
@dataclass(frozen=True, slots=True)
class
ReplaySnapshot:
42@dataclass(frozen=True, slots=True) 43class ReplaySnapshot: 44 state: str 45 is_started: bool 46 recording_id: str | None 47 recording_path: str | None 48 position_ns: int 49 duration_ns: int 50 speed_multiplier: float
@dataclass(frozen=True, slots=True)
class
ExportJobSnapshot:
74@dataclass(frozen=True, slots=True) 75class ExportJobSnapshot: 76 job_id: str 77 recording_id: str 78 format_id: str 79 state: str 80 progress: float 81 output_path: str | None 82 error: str | None
class
BackendEventBroker:
119class BackendEventBroker: 120 def __init__(self) -> None: 121 self._streams: list[EventStream] = [] 122 self._lock = RLock() 123 124 def open_stream(self, *, maxsize: int = 1024) -> EventStream: 125 stream = EventStream(self, maxsize=maxsize) 126 with self._lock: 127 self._streams.append(stream) 128 return stream 129 130 def publish(self, event: BackendEvent) -> None: 131 with self._lock: 132 streams = tuple(self._streams) 133 for stream in streams: 134 stream._publish(event) 135 136 def _close_stream(self, stream: EventStream) -> None: 137 with self._lock: 138 try: 139 self._streams.remove(stream) 140 except ValueError: 141 pass 142 stream._push_closed()
@dataclass(frozen=True, slots=True)
class
DriverConnectionLostEvent:
8@dataclass(frozen=True, slots=True) 9class DriverConnectionLostEvent: 10 driver_id: str 11 detail: object | None = None 12 kind: Literal["driver_connection_lost"] = "driver_connection_lost"
@dataclass(frozen=True, slots=True)
class
DriverExecutorFailedEvent:
15@dataclass(frozen=True, slots=True) 16class DriverExecutorFailedEvent: 17 driver_id: str 18 detail: object 19 kind: Literal["driver_executor_failed"] = "driver_executor_failed"
class
EventStream:
22class EventStream: 23 def __init__( 24 self, 25 broker: BackendEventBroker, 26 *, 27 maxsize: int = 1024, 28 ) -> None: 29 normalized_maxsize = max(1, int(maxsize)) 30 self._broker = broker 31 self._queue: queue.Queue[BackendEvent | object] = queue.Queue(maxsize=normalized_maxsize) 32 self._lock = RLock() 33 self._closed = False 34 35 @property 36 def closed(self) -> bool: 37 return self._closed 38 39 def read( 40 self, 41 *, 42 block: bool = True, 43 timeout: float | None = None, 44 ) -> BackendEvent: 45 item = self._queue.get(block=block, timeout=timeout) 46 if item is _STREAM_CLOSED: 47 raise StreamClosedError("event stream is closed") 48 if item is _STREAM_OVERFLOW: 49 raise EventStreamOverflowError("event stream overflowed") 50 return item # type: ignore[return-value] 51 52 def read_many(self, *, max_items: int | None = None) -> list[BackendEvent]: 53 if max_items is not None and max_items <= 0: 54 return [] 55 56 items: list[BackendEvent] = [] 57 while max_items is None or len(items) < max_items: 58 try: 59 item = self._queue.get_nowait() 60 except queue.Empty: 61 break 62 if item is _STREAM_CLOSED: 63 if items: 64 return items 65 raise StreamClosedError("event stream is closed") 66 if item is _STREAM_OVERFLOW: 67 if items: 68 return items 69 raise EventStreamOverflowError("event stream overflowed") 70 items.append(item) # type: ignore[arg-type] 71 return items 72 73 def close(self) -> None: 74 with self._lock: 75 if self._closed: 76 return 77 self._closed = True 78 self._broker._close_stream(self) 79 80 def _publish(self, event: BackendEvent) -> None: 81 with self._lock: 82 if self._closed: 83 return 84 try: 85 self._queue.put_nowait(event) 86 return 87 except queue.Full: 88 pass 89 90 try: 91 self._queue.get_nowait() 92 except queue.Empty: 93 pass 94 95 try: 96 self._queue.put_nowait(_STREAM_OVERFLOW) 97 except queue.Full: 98 pass 99 100 def _push_closed(self) -> None: 101 with self._lock: 102 try: 103 self._queue.put_nowait(_STREAM_CLOSED) 104 return 105 except queue.Full: 106 pass 107 108 try: 109 self._queue.get_nowait() 110 except queue.Empty: 111 pass 112 113 try: 114 self._queue.put_nowait(_STREAM_CLOSED) 115 except queue.Full: 116 pass
EventStream( broker: BackendEventBroker, *, maxsize: int = 1024)
23 def __init__( 24 self, 25 broker: BackendEventBroker, 26 *, 27 maxsize: int = 1024, 28 ) -> None: 29 normalized_maxsize = max(1, int(maxsize)) 30 self._broker = broker 31 self._queue: queue.Queue[BackendEvent | object] = queue.Queue(maxsize=normalized_maxsize) 32 self._lock = RLock() 33 self._closed = False
def
read( self, *, block: bool = True, timeout: float | None = None) -> BackendEvent:
39 def read( 40 self, 41 *, 42 block: bool = True, 43 timeout: float | None = None, 44 ) -> BackendEvent: 45 item = self._queue.get(block=block, timeout=timeout) 46 if item is _STREAM_CLOSED: 47 raise StreamClosedError("event stream is closed") 48 if item is _STREAM_OVERFLOW: 49 raise EventStreamOverflowError("event stream overflowed") 50 return item # type: ignore[return-value]
def
read_many(self, *, max_items: int | None = None) -> list[BackendEvent]:
52 def read_many(self, *, max_items: int | None = None) -> list[BackendEvent]: 53 if max_items is not None and max_items <= 0: 54 return [] 55 56 items: list[BackendEvent] = [] 57 while max_items is None or len(items) < max_items: 58 try: 59 item = self._queue.get_nowait() 60 except queue.Empty: 61 break 62 if item is _STREAM_CLOSED: 63 if items: 64 return items 65 raise StreamClosedError("event stream is closed") 66 if item is _STREAM_OVERFLOW: 67 if items: 68 return items 69 raise EventStreamOverflowError("event stream overflowed") 70 items.append(item) # type: ignore[arg-type] 71 return items
class
EventStreamOverflowError(builtins.Exception):
14class EventStreamOverflowError(Exception): 15 """Raised when the low-frequency event stream overflows."""
Raised when the low-frequency event stream overflows.
class
FrameStream:
26class FrameStream: 27 def __init__( 28 self, 29 bus: StreamBus, 30 *, 31 maxsize: int = 256, 32 drop_policy: FrameDropPolicy = "drop_oldest", 33 consumer_name: str, 34 ) -> None: 35 if drop_policy not in {"drop_oldest", "error"}: 36 raise ValueError(f"unsupported frame drop policy: {drop_policy}") 37 normalized_maxsize = max(1, int(maxsize)) 38 self._bus = bus 39 self._queue: queue.Queue[FrameEnvelope | object] = queue.Queue(maxsize=normalized_maxsize) 40 self._drop_policy = drop_policy 41 self._consumer_name = consumer_name 42 self._dropped_count = 0 43 self._closed = False 44 self._overflowed = False 45 self._lock = RLock() 46 47 @property 48 def consumer_name(self) -> str: 49 return self._consumer_name 50 51 @property 52 def dropped_count(self) -> int: 53 return self._dropped_count 54 55 @property 56 def overflowed(self) -> bool: 57 return self._overflowed 58 59 @property 60 def closed(self) -> bool: 61 return self._closed 62 63 def read( 64 self, 65 *, 66 block: bool = True, 67 timeout: float | None = None, 68 ) -> FrameEnvelope: 69 item = self._queue.get(block=block, timeout=timeout) 70 if item is _FRAME_STREAM_CLOSED: 71 raise StreamClosedError("frame stream is closed") 72 if item is _FRAME_STREAM_OVERFLOW: 73 raise FrameStreamOverflowError(self._consumer_name) 74 return item # type: ignore[return-value] 75 76 def read_many(self, *, max_items: int | None = None) -> list[FrameEnvelope]: 77 if max_items is not None and max_items <= 0: 78 return [] 79 80 items: list[FrameEnvelope] = [] 81 while max_items is None or len(items) < max_items: 82 try: 83 item = self._queue.get_nowait() 84 except queue.Empty: 85 break 86 if item is _FRAME_STREAM_CLOSED: 87 if items: 88 return items 89 raise StreamClosedError("frame stream is closed") 90 if item is _FRAME_STREAM_OVERFLOW: 91 if items: 92 return items 93 raise FrameStreamOverflowError(self._consumer_name) 94 items.append(item) # type: ignore[arg-type] 95 return items 96 97 def close(self) -> None: 98 with self._lock: 99 if self._closed: 100 return 101 self._closed = True 102 self._bus._close_frame_stream(self) 103 104 def _publish(self, frame: FrameEnvelope) -> None: 105 with self._lock: 106 if self._closed: 107 return 108 if self._overflowed and self._drop_policy == "error": 109 self._dropped_count += 1 110 return 111 try: 112 self._queue.put_nowait(frame) 113 return 114 except queue.Full: 115 self._dropped_count += 1 116 117 if self._drop_policy == "drop_oldest": 118 self._drop_oldest_and_enqueue(frame) 119 return 120 121 self._overflowed = True 122 self._clear_queue_locked() 123 self._push_control_locked(_FRAME_STREAM_OVERFLOW) 124 125 def _drop_oldest_and_enqueue(self, frame: FrameEnvelope) -> None: 126 try: 127 self._queue.get_nowait() 128 except queue.Empty: 129 pass 130 try: 131 self._queue.put_nowait(frame) 132 except queue.Full: 133 pass 134 135 def _clear_queue_locked(self) -> None: 136 while True: 137 try: 138 self._queue.get_nowait() 139 except queue.Empty: 140 break 141 142 def _push_closed(self) -> None: 143 with self._lock: 144 self._push_control_locked(_FRAME_STREAM_CLOSED) 145 146 def _push_control_locked(self, item: object) -> None: 147 try: 148 self._queue.put_nowait(item) 149 return 150 except queue.Full: 151 pass 152 153 try: 154 self._queue.get_nowait() 155 except queue.Empty: 156 pass 157 158 try: 159 self._queue.put_nowait(item) 160 except queue.Full: 161 pass
FrameStream( bus: StreamBus, *, maxsize: int = 256, drop_policy: Literal['drop_oldest', 'error'] = 'drop_oldest', consumer_name: str)
27 def __init__( 28 self, 29 bus: StreamBus, 30 *, 31 maxsize: int = 256, 32 drop_policy: FrameDropPolicy = "drop_oldest", 33 consumer_name: str, 34 ) -> None: 35 if drop_policy not in {"drop_oldest", "error"}: 36 raise ValueError(f"unsupported frame drop policy: {drop_policy}") 37 normalized_maxsize = max(1, int(maxsize)) 38 self._bus = bus 39 self._queue: queue.Queue[FrameEnvelope | object] = queue.Queue(maxsize=normalized_maxsize) 40 self._drop_policy = drop_policy 41 self._consumer_name = consumer_name 42 self._dropped_count = 0 43 self._closed = False 44 self._overflowed = False 45 self._lock = RLock()
63 def read( 64 self, 65 *, 66 block: bool = True, 67 timeout: float | None = None, 68 ) -> FrameEnvelope: 69 item = self._queue.get(block=block, timeout=timeout) 70 if item is _FRAME_STREAM_CLOSED: 71 raise StreamClosedError("frame stream is closed") 72 if item is _FRAME_STREAM_OVERFLOW: 73 raise FrameStreamOverflowError(self._consumer_name) 74 return item # type: ignore[return-value]
76 def read_many(self, *, max_items: int | None = None) -> list[FrameEnvelope]: 77 if max_items is not None and max_items <= 0: 78 return [] 79 80 items: list[FrameEnvelope] = [] 81 while max_items is None or len(items) < max_items: 82 try: 83 item = self._queue.get_nowait() 84 except queue.Empty: 85 break 86 if item is _FRAME_STREAM_CLOSED: 87 if items: 88 return items 89 raise StreamClosedError("frame stream is closed") 90 if item is _FRAME_STREAM_OVERFLOW: 91 if items: 92 return items 93 raise FrameStreamOverflowError(self._consumer_name) 94 items.append(item) # type: ignore[arg-type] 95 return items
class
FrameStreamOverflowError(builtins.Exception):
20class FrameStreamOverflowError(Exception): 21 def __init__(self, consumer_name: str) -> None: 22 self.consumer_name = consumer_name 23 super().__init__(f"frame stream overflowed: consumer_name={consumer_name}")
Common base class for all non-exit exceptions.
def
configure_host_logging( *, log_path: str | pathlib._local.Path | None = None, app_name: str = 'ModLink Studio', app_author: str = 'ModLink', log_filename: str = 'modlink.log', console: bool = True, debug: bool = False, max_bytes: int = 5242880, backup_count: int = 5) -> pathlib._local.Path:
26def configure_host_logging( 27 *, 28 log_path: str | Path | None = None, 29 app_name: str = "ModLink Studio", 30 app_author: str = "ModLink", 31 log_filename: str = "modlink.log", 32 console: bool = True, 33 debug: bool = False, 34 max_bytes: int = DEFAULT_MAX_LOG_BYTES, 35 backup_count: int = DEFAULT_BACKUP_COUNT, 36) -> Path: 37 resolved_path = ( 38 Path(log_path) 39 if log_path is not None 40 else Path(user_log_path(app_name, app_author)) / log_filename 41 ) 42 resolved_path.parent.mkdir(parents=True, exist_ok=True) 43 44 root_logger = logging.getLogger() 45 _remove_managed_handlers(root_logger) 46 root_logger.setLevel(logging.WARNING) 47 log_level = logging.DEBUG if debug else logging.INFO 48 49 formatter = logging.Formatter(DEFAULT_LOG_FORMAT) 50 file_handler = RotatingFileHandler( 51 resolved_path, 52 maxBytes=max(1, int(max_bytes)), 53 backupCount=max(1, int(backup_count)), 54 encoding="utf-8", 55 ) 56 file_handler.setLevel(log_level) 57 file_handler.setFormatter(formatter) 58 _mark_managed(file_handler) 59 root_logger.addHandler(file_handler) 60 61 if console: 62 console_handler = logging.StreamHandler(sys.stderr) 63 console_handler.setLevel(log_level) 64 console_handler.setFormatter(formatter) 65 _mark_managed(console_handler) 66 root_logger.addHandler(console_handler) 67 68 for logger_name in _MANAGED_LOGGER_NAMES: 69 logger = logging.getLogger(logger_name) 70 logger.setLevel(log_level) 71 logger.propagate = True 72 73 logging.captureWarnings(True) 74 logging.getLogger(__name__).info( 75 "Configured logging at %s (level=%s)", 76 resolved_path, 77 logging.getLevelName(log_level), 78 ) 79 return resolved_path
@dataclass(frozen=True, slots=True)
class
DriverSnapshot:
7@dataclass(frozen=True, slots=True) 8class DriverSnapshot: 9 driver_id: str 10 display_name: str 11 supported_providers: tuple[str, ...] 12 is_running: bool 13 is_connected: bool 14 is_streaming: bool
class
ModLinkEngine:
28class ModLinkEngine: 29 """Application engine that owns shared services and driver threads.""" 30 31 def __init__( 32 self, 33 *, 34 settings_path: str | Path | None = None, 35 settings_version: int = 1, 36 parent: object | None = None, 37 ) -> None: 38 resolved_driver_factories = tuple(discover_driver_factories()) 39 logger.info( 40 "Starting ModLink engine with %d driver factories", len(resolved_driver_factories) 41 ) 42 self._parent = parent 43 self._event_broker = BackendEventBroker() 44 self._settings = SettingsStore( 45 path=_resolve_settings_path(settings_path), 46 version=settings_version, 47 on_change=self._publish_setting_changed, 48 ) 49 declare_core_settings(self._settings) 50 if self._settings.path is not None and self._settings.path.exists(): 51 self._settings.load(ignore_unknown=True) 52 self.bus = StreamBus(event_broker=self._event_broker, parent=self) 53 self._recording = RecordingBackend( 54 self.bus, 55 settings=self._settings, 56 publish_event=self._event_broker.publish, 57 parent=self, 58 ) 59 self._replay = ReplayBackend( 60 settings=self._settings, 61 parent=self, 62 ) 63 self._driver_portals: dict[str, DriverPortal] = {} 64 attached_portals: list[DriverPortal] = [] 65 try: 66 for factory in resolved_driver_factories: 67 portal = self._attach_driver(factory) 68 attached_portals.append(portal) 69 self._recording.start() 70 self._replay.start() 71 logger.info("ModLink engine started with %d driver portals", len(self._driver_portals)) 72 except Exception as exc: 73 logger.exception("Engine startup failed; rolling back attached services") 74 self._rollback_startup(attached_portals, exc) 75 raise 76 77 @property 78 def settings(self) -> SettingsStore: 79 return self._settings 80 81 @property 82 def recording(self) -> RecordingBackend: 83 return self._recording 84 85 @property 86 def replay(self) -> ReplayBackend: 87 return self._replay 88 89 def recording_snapshot(self) -> RecordingSnapshot: 90 return self._recording.snapshot() 91 92 def replay_snapshot(self) -> ReplaySnapshot: 93 return self._replay.snapshot() 94 95 def driver_portals(self) -> tuple[DriverPortal, ...]: 96 return tuple(self._driver_portals.values()) 97 98 def driver_snapshots(self) -> tuple[DriverSnapshot, ...]: 99 return tuple(portal.snapshot() for portal in self._driver_portals.values()) 100 101 def driver_portal(self, driver_id: str) -> DriverPortal | None: 102 return self._driver_portals.get(driver_id) 103 104 def settings_snapshot(self) -> dict[str, Any]: 105 return self._settings.snapshot() 106 107 def open_event_stream(self, *, maxsize: int = 1024) -> EventStream: 108 return self._event_broker.open_stream(maxsize=maxsize) 109 110 def _publish_setting_changed(self, key: str, value: Any) -> None: 111 self._event_broker.publish( 112 SettingChangedEvent(key=key, value=value, ts=time.time()), 113 ) 114 115 def _attach_driver(self, driver_factory: DriverFactory) -> DriverPortal: 116 portal = DriverPortal( 117 driver_factory, 118 publish_event=self._event_broker.publish, 119 frame_sink=self.bus.ingest_frame, 120 parent=self, 121 ) 122 driver_id = portal.driver_id.strip() 123 if driver_id in self._driver_portals: 124 raise ValueError(f"driver_id '{driver_id}' is already installed") 125 126 descriptors = portal.descriptors() 127 self.bus.add_descriptors(descriptors) 128 try: 129 portal.start(timeout_ms=DEFAULT_DRIVER_STARTUP_TIMEOUT_MS) 130 except Exception: 131 for descriptor in descriptors: 132 self.bus.remove_descriptor(descriptor.stream_id) 133 raise 134 135 self._driver_portals[driver_id] = portal 136 return portal 137 138 def _rollback_startup( 139 self, 140 attached_portals: list[DriverPortal], 141 startup_error: Exception, 142 ) -> None: 143 cleanup_failures: list[str] = [] 144 for portal in reversed(attached_portals): 145 try: 146 portal.stop() 147 except Exception as exc: 148 logger.exception( 149 "Engine startup rollback failed while stopping driver '%s'", portal.driver_id 150 ) 151 cleanup_failures.append( 152 "cleanup failed while stopping driver " 153 f"'{portal.driver_id}': {type(exc).__name__}: {exc}" 154 ) 155 for descriptor in portal.descriptors(): 156 self.bus.remove_descriptor(descriptor.stream_id) 157 158 self._driver_portals.clear() 159 try: 160 self._recording.shutdown() 161 except Exception as exc: 162 logger.exception("Engine startup rollback failed while shutting down recording") 163 cleanup_failures.append( 164 f"cleanup failed while shutting down recording: {type(exc).__name__}: {exc}" 165 ) 166 try: 167 self._replay.shutdown() 168 except Exception as exc: 169 logger.exception("Engine startup rollback failed while shutting down replay") 170 cleanup_failures.append( 171 f"cleanup failed while shutting down replay: {type(exc).__name__}: {exc}" 172 ) 173 174 for note in cleanup_failures: 175 startup_error.add_note(note) 176 177 def shutdown(self) -> None: 178 logger.info("Shutting down ModLink engine") 179 first_error: Exception | None = None 180 for portal in self._driver_portals.values(): 181 try: 182 portal.stop() 183 except Exception as exc: 184 logger.exception( 185 "Engine shutdown failed while stopping driver '%s'", portal.driver_id 186 ) 187 if first_error is None: 188 first_error = exc 189 self._driver_portals.clear() 190 try: 191 self._recording.shutdown() 192 except Exception as exc: 193 logger.exception("Engine shutdown failed while shutting down recording") 194 if first_error is None: 195 first_error = exc 196 try: 197 self._replay.shutdown() 198 except Exception as exc: 199 logger.exception("Engine shutdown failed while shutting down replay") 200 if first_error is None: 201 first_error = exc 202 if first_error is not None: 203 raise first_error 204 logger.info("ModLink engine shut down")
Application engine that owns shared services and driver threads.
ModLinkEngine( *, settings_path: str | pathlib._local.Path | None = None, settings_version: int = 1, parent: object | None = None)
31 def __init__( 32 self, 33 *, 34 settings_path: str | Path | None = None, 35 settings_version: int = 1, 36 parent: object | None = None, 37 ) -> None: 38 resolved_driver_factories = tuple(discover_driver_factories()) 39 logger.info( 40 "Starting ModLink engine with %d driver factories", len(resolved_driver_factories) 41 ) 42 self._parent = parent 43 self._event_broker = BackendEventBroker() 44 self._settings = SettingsStore( 45 path=_resolve_settings_path(settings_path), 46 version=settings_version, 47 on_change=self._publish_setting_changed, 48 ) 49 declare_core_settings(self._settings) 50 if self._settings.path is not None and self._settings.path.exists(): 51 self._settings.load(ignore_unknown=True) 52 self.bus = StreamBus(event_broker=self._event_broker, parent=self) 53 self._recording = RecordingBackend( 54 self.bus, 55 settings=self._settings, 56 publish_event=self._event_broker.publish, 57 parent=self, 58 ) 59 self._replay = ReplayBackend( 60 settings=self._settings, 61 parent=self, 62 ) 63 self._driver_portals: dict[str, DriverPortal] = {} 64 attached_portals: list[DriverPortal] = [] 65 try: 66 for factory in resolved_driver_factories: 67 portal = self._attach_driver(factory) 68 attached_portals.append(portal) 69 self._recording.start() 70 self._replay.start() 71 logger.info("ModLink engine started with %d driver portals", len(self._driver_portals)) 72 except Exception as exc: 73 logger.exception("Engine startup failed; rolling back attached services") 74 self._rollback_startup(attached_portals, exc) 75 raise
settings: SettingsStore
recording: RecordingBackend
def
shutdown(self) -> None:
177 def shutdown(self) -> None: 178 logger.info("Shutting down ModLink engine") 179 first_error: Exception | None = None 180 for portal in self._driver_portals.values(): 181 try: 182 portal.stop() 183 except Exception as exc: 184 logger.exception( 185 "Engine shutdown failed while stopping driver '%s'", portal.driver_id 186 ) 187 if first_error is None: 188 first_error = exc 189 self._driver_portals.clear() 190 try: 191 self._recording.shutdown() 192 except Exception as exc: 193 logger.exception("Engine shutdown failed while shutting down recording") 194 if first_error is None: 195 first_error = exc 196 try: 197 self._replay.shutdown() 198 except Exception as exc: 199 logger.exception("Engine shutdown failed while shutting down replay") 200 if first_error is None: 201 first_error = exc 202 if first_error is not None: 203 raise first_error 204 logger.info("ModLink engine shut down")
@dataclass(frozen=True, slots=True)
class
RecordingFailedEvent:
22@dataclass(frozen=True, slots=True) 23class RecordingFailedEvent: 24 recording_id: str 25 recording_path: str 26 frame_counts_by_stream: dict[str, int] 27 reason: str 28 ts_ns: int 29 kind: Literal["recording_failed"] = "recording_failed"
@dataclass(frozen=True, slots=True)
class
SettingChangedEvent:
32@dataclass(frozen=True, slots=True) 33class SettingChangedEvent: 34 key: str 35 value: Any 36 ts: float 37 kind: Literal["setting_changed"] = "setting_changed"
class
SettingsStore:
230class SettingsStore: 231 ROOT_RESERVED = frozenset({"add", "load", "path", "save", "snapshot"}) 232 233 def __init__( 234 self, 235 *, 236 path: str | Path | None = None, 237 version: int = 1, 238 on_change: Any = None, 239 ) -> None: 240 object.__setattr__(self, "_path", None if path is None else Path(path)) 241 object.__setattr__(self, "_version", version) 242 object.__setattr__(self, "_on_change", on_change) 243 object.__setattr__(self, "_lock", RLock()) 244 object.__setattr__(self, "_root", SettingsNode(name="", parent=None, store=self)) 245 246 @property 247 def path(self) -> Path | None: 248 return self._path 249 250 def add(self, **specs: SettingsSpec) -> SettingsStore: 251 with self._lock: 252 overlap = self.ROOT_RESERVED.intersection(specs) 253 if overlap: 254 reserved_name = next(iter(sorted(overlap))) 255 raise ValueError(f"reserved key: {reserved_name!r}") 256 self._root.add(**specs) 257 return self 258 259 def __getattr__(self, name: str) -> SettingsNode | SettingItem: 260 if name.startswith("_"): 261 raise AttributeError(name) 262 with self._lock: 263 return self._root.get_child(name) 264 265 def __setattr__(self, name: str, value: object) -> None: 266 if name.startswith("_"): 267 object.__setattr__(self, name, value) 268 return 269 with self._lock: 270 setattr(self._root, name, value) 271 272 def snapshot(self) -> dict[str, object]: 273 with self._lock: 274 return self._root.dump_dict() 275 276 def save(self, path: str | Path | None = None) -> None: 277 with self._lock: 278 resolved_path = self._resolve_path(path) 279 resolved_path.parent.mkdir(parents=True, exist_ok=True) 280 payload = { 281 "_version": self._version, 282 "values": self._root.dump_dict(), 283 } 284 _atomic_write_text( 285 resolved_path, 286 json.dumps(payload, indent=2, ensure_ascii=False), 287 ) 288 289 def load( 290 self, 291 path: str | Path | None = None, 292 *, 293 ignore_unknown: bool = False, 294 ) -> None: 295 with self._lock: 296 resolved_path = self._resolve_path(path) 297 payload = json.loads(resolved_path.read_text(encoding="utf-8")) 298 if not isinstance(payload, dict): 299 raise TypeError("settings file must contain a JSON object") 300 301 file_version = payload.get("_version") 302 if file_version is not None and file_version != self._version: 303 raise ValueError( 304 f"settings version mismatch: file={file_version}, expected={self._version}" 305 ) 306 307 values = payload.get("values", {}) 308 previous_snapshot = self.snapshot() 309 previous = self._root.collect_leaf_values() 310 self._root.reset(notify=False) 311 try: 312 self._root.apply_dict(values, notify=False, ignore_unknown=ignore_unknown) 313 except Exception: 314 self._root.reset(notify=False) 315 self._root.apply_dict(previous_snapshot, notify=False) 316 raise 317 current = self._root.collect_leaf_values() 318 for key, value in current.items(): 319 if previous.get(key) != value: 320 self._emit_change(key, value) 321 322 def _resolve_path(self, path: str | Path | None) -> Path: 323 resolved_path = self._path if path is None else Path(path) 324 if resolved_path is None: 325 raise ValueError("path is required") 326 return resolved_path 327 328 def _emit_change(self, key: str, value: object) -> None: 329 on_change = self._on_change 330 if on_change is None: 331 return 332 on_change(key, value)
SettingsStore( *, path: str | pathlib._local.Path | None = None, version: int = 1, on_change: Any = None)
233 def __init__( 234 self, 235 *, 236 path: str | Path | None = None, 237 version: int = 1, 238 on_change: Any = None, 239 ) -> None: 240 object.__setattr__(self, "_path", None if path is None else Path(path)) 241 object.__setattr__(self, "_version", version) 242 object.__setattr__(self, "_on_change", on_change) 243 object.__setattr__(self, "_lock", RLock()) 244 object.__setattr__(self, "_root", SettingsNode(name="", parent=None, store=self))
def
save(self, path: str | pathlib._local.Path | None = None) -> None:
276 def save(self, path: str | Path | None = None) -> None: 277 with self._lock: 278 resolved_path = self._resolve_path(path) 279 resolved_path.parent.mkdir(parents=True, exist_ok=True) 280 payload = { 281 "_version": self._version, 282 "values": self._root.dump_dict(), 283 } 284 _atomic_write_text( 285 resolved_path, 286 json.dumps(payload, indent=2, ensure_ascii=False), 287 )
def
load( self, path: str | pathlib._local.Path | None = None, *, ignore_unknown: bool = False) -> None:
289 def load( 290 self, 291 path: str | Path | None = None, 292 *, 293 ignore_unknown: bool = False, 294 ) -> None: 295 with self._lock: 296 resolved_path = self._resolve_path(path) 297 payload = json.loads(resolved_path.read_text(encoding="utf-8")) 298 if not isinstance(payload, dict): 299 raise TypeError("settings file must contain a JSON object") 300 301 file_version = payload.get("_version") 302 if file_version is not None and file_version != self._version: 303 raise ValueError( 304 f"settings version mismatch: file={file_version}, expected={self._version}" 305 ) 306 307 values = payload.get("values", {}) 308 previous_snapshot = self.snapshot() 309 previous = self._root.collect_leaf_values() 310 self._root.reset(notify=False) 311 try: 312 self._root.apply_dict(values, notify=False, ignore_unknown=ignore_unknown) 313 except Exception: 314 self._root.reset(notify=False) 315 self._root.apply_dict(previous_snapshot, notify=False) 316 raise 317 current = self._root.collect_leaf_values() 318 for key, value in current.items(): 319 if previous.get(key) != value: 320 self._emit_change(key, value)
class
StreamClosedError(builtins.Exception):
10class StreamClosedError(Exception): 11 """Raised when a blocking stream read is interrupted by stream closure."""
Raised when a blocking stream read is interrupted by stream closure.
class
StreamBus:
164class StreamBus: 165 """Stores stream descriptors and fans out accepted frames to frame streams.""" 166 167 _STREAM_COUNTER = count(1) 168 169 def __init__( 170 self, 171 *, 172 event_broker: BackendEventBroker, 173 parent: object | None = None, 174 ) -> None: 175 self._event_broker = event_broker 176 self._parent = parent 177 self._descriptors: dict[str, StreamDescriptor] = {} 178 self._frame_streams: list[FrameStream] = [] 179 self._lock = RLock() 180 181 def add_descriptor(self, descriptor: StreamDescriptor) -> None: 182 existing_descriptor = self._descriptors.get(descriptor.stream_id) 183 if existing_descriptor is not None: 184 if existing_descriptor == descriptor: 185 return 186 187 raise ValueError(f"conflicting descriptor for stream_id '{descriptor.stream_id}'") 188 189 self._descriptors[descriptor.stream_id] = descriptor 190 191 def add_descriptors(self, descriptors: Iterable[StreamDescriptor]) -> None: 192 for descriptor in descriptors: 193 self.add_descriptor(descriptor) 194 195 def remove_descriptor(self, stream_id: str) -> None: 196 self._descriptors.pop(str(stream_id), None) 197 198 def ingest_frame(self, frame: object) -> bool: 199 if not isinstance(frame, FrameEnvelope): 200 return False 201 202 if frame.stream_id not in self._descriptors: 203 return False 204 205 with self._lock: 206 streams = tuple(self._frame_streams) 207 for stream in streams: 208 stream._publish(frame) 209 return True 210 211 def open_frame_stream( 212 self, 213 *, 214 maxsize: int = 256, 215 drop_policy: FrameDropPolicy = "drop_oldest", 216 consumer_name: str | None = None, 217 ) -> FrameStream: 218 stream = FrameStream( 219 self, 220 maxsize=maxsize, 221 drop_policy=drop_policy, 222 consumer_name=consumer_name or self._next_consumer_name(), 223 ) 224 with self._lock: 225 self._frame_streams.append(stream) 226 return stream 227 228 def descriptor(self, stream_id: str) -> StreamDescriptor | None: 229 return self._descriptors.get(stream_id) 230 231 def descriptors(self) -> dict[str, StreamDescriptor]: 232 return dict(self._descriptors) 233 234 def _close_frame_stream(self, stream: FrameStream) -> None: 235 with self._lock: 236 try: 237 self._frame_streams.remove(stream) 238 except ValueError: 239 pass 240 stream._push_closed() 241 242 @classmethod 243 def _next_consumer_name(cls) -> str: 244 return f"frame_stream_{next(cls._STREAM_COUNTER)}"
Stores stream descriptors and fans out accepted frames to frame streams.
StreamBus( *, event_broker: BackendEventBroker, parent: object | None = None)
169 def __init__( 170 self, 171 *, 172 event_broker: BackendEventBroker, 173 parent: object | None = None, 174 ) -> None: 175 self._event_broker = event_broker 176 self._parent = parent 177 self._descriptors: dict[str, StreamDescriptor] = {} 178 self._frame_streams: list[FrameStream] = [] 179 self._lock = RLock()
181 def add_descriptor(self, descriptor: StreamDescriptor) -> None: 182 existing_descriptor = self._descriptors.get(descriptor.stream_id) 183 if existing_descriptor is not None: 184 if existing_descriptor == descriptor: 185 return 186 187 raise ValueError(f"conflicting descriptor for stream_id '{descriptor.stream_id}'") 188 189 self._descriptors[descriptor.stream_id] = descriptor
def
ingest_frame(self, frame: object) -> bool:
198 def ingest_frame(self, frame: object) -> bool: 199 if not isinstance(frame, FrameEnvelope): 200 return False 201 202 if frame.stream_id not in self._descriptors: 203 return False 204 205 with self._lock: 206 streams = tuple(self._frame_streams) 207 for stream in streams: 208 stream._publish(frame) 209 return True
def
open_frame_stream( self, *, maxsize: int = 256, drop_policy: Literal['drop_oldest', 'error'] = 'drop_oldest', consumer_name: str | None = None) -> FrameStream:
211 def open_frame_stream( 212 self, 213 *, 214 maxsize: int = 256, 215 drop_policy: FrameDropPolicy = "drop_oldest", 216 consumer_name: str | None = None, 217 ) -> FrameStream: 218 stream = FrameStream( 219 self, 220 maxsize=maxsize, 221 drop_policy=drop_policy, 222 consumer_name=consumer_name or self._next_consumer_name(), 223 ) 224 with self._lock: 225 self._frame_streams.append(stream) 226 return stream