modlink_core

 1from .bus import FrameStream, FrameStreamOverflowError, StreamBus
 2from .event_stream import (
 3    BackendEventBroker,
 4    EventStream,
 5    EventStreamOverflowError,
 6    StreamClosedError,
 7)
 8from .events import (
 9    DriverConnectionLostEvent,
10    DriverExecutorFailedEvent,
11    RecordingFailedEvent,
12    SettingChangedEvent,
13)
14from .logging_setup import configure_host_logging
15from .models import (
16    DriverSnapshot,
17    ExportJobSnapshot,
18    RecordingSnapshot,
19    RecordingStartSummary,
20    RecordingStopSummary,
21    ReplayMarker,
22    ReplayRecordingSummary,
23    ReplaySegment,
24    ReplaySnapshot,
25)
26from .recording import RecordingBackend
27from .replay import ReplayBackend
28from .runtime import ModLinkEngine
29from .settings import (
30    SettingsStore,
31)
32
33__all__ = [
34    "RecordingBackend",
35    "RecordingSnapshot",
36    "RecordingStartSummary",
37    "RecordingStopSummary",
38    "ReplayBackend",
39    "ReplayMarker",
40    "ReplayRecordingSummary",
41    "ReplaySegment",
42    "ReplaySnapshot",
43    "ExportJobSnapshot",
44    "BackendEventBroker",
45    "DriverConnectionLostEvent",
46    "DriverExecutorFailedEvent",
47    "EventStream",
48    "EventStreamOverflowError",
49    "FrameStream",
50    "FrameStreamOverflowError",
51    "configure_host_logging",
52    "DriverSnapshot",
53    "ModLinkEngine",
54    "RecordingFailedEvent",
55    "SettingChangedEvent",
56    "SettingsStore",
57    "StreamClosedError",
58    "StreamBus",
59]
class RecordingBackend:
 46class RecordingBackend:
 47    """Threaded recording backend that persists bus frames to disk."""
 48
 49    def __init__(
 50        self,
 51        bus: StreamBus,
 52        *,
 53        settings: SettingsStore,
 54        publish_event: Callable[[BackendEvent], None],
 55        parent: object | None = None,
 56    ) -> None:
 57        self._bus = bus
 58        self._settings = settings
 59        self._publish_event = publish_event
 60        self._parent = parent
 61        self._command_queue: queue.Queue[RecordingCommand | object] = queue.Queue()
 62        self._shutdown_sentinel = object()
 63        self._thread: threading.Thread | None = None
 64        self._frame_stream: FrameStream | None = None
 65        self._state = "idle"
 66        self._started = False
 67        self._accepting_commands = False
 68        self._worker_exit_error: Exception | None = None
 69        self._active_recording: ActiveRecording | None = None
 70        self._lock = threading.RLock()
 71
 72    @property
 73    def root_dir(self) -> Path:
 74        return resolved_storage_root_dir(self._settings)
 75
 76    @property
 77    def is_started(self) -> bool:
 78        thread = self._thread
 79        return self._started and thread is not None and thread.is_alive()
 80
 81    @property
 82    def state(self) -> str:
 83        return self._state
 84
 85    @property
 86    def is_recording(self) -> bool:
 87        return self._state == "recording"
 88
 89    def snapshot(self) -> RecordingSnapshot:
 90        return RecordingSnapshot(
 91            state=self._state,
 92            is_started=self.is_started,
 93            is_recording=self.is_recording,
 94            root_dir=str(self.root_dir),
 95        )
 96
 97    def start(self) -> None:
 98        if self.is_started:
 99            self._started = True
100            self._accepting_commands = True
101            return
102        logger.info("Starting recording backend")
103        if self._frame_stream is None or self._frame_stream.closed:
104            self._frame_stream = self._open_frame_stream()
105        thread = threading.Thread(
106            target=self._run,
107            name="modlink.recording",
108            daemon=True,
109        )
110        self._thread = thread
111        self._worker_exit_error = None
112        self._accepting_commands = True
113        self._started = True
114        thread.start()
115        logger.info("Recording backend started")
116
117    def start_recording(self, recording_label: str | None = None) -> Future[RecordingStartSummary]:
118        storage_root_dir = self.root_dir
119        recordings_root_dir = storage_root_dir / "recordings"
120        return self._submit_command(
121            self._start_recording_worker,
122            str(storage_root_dir),
123            str(recordings_root_dir),
124            recording_label,
125            self._bus.descriptors(),
126        )
127
128    def stop_recording(self) -> Future[RecordingStopSummary]:
129        return self._submit_command(self._stop_recording_worker)
130
131    def add_marker(self, label: str | None = None) -> Future[None]:
132        return self._submit_command(self._add_marker_worker, label)
133
134    def add_segment(
135        self,
136        start_ns: int,
137        end_ns: int,
138        label: str | None = None,
139    ) -> Future[None]:
140        return self._submit_command(
141            self._add_segment_worker,
142            start_ns,
143            end_ns,
144            label,
145        )
146
147    def shutdown(self, *, timeout_ms: int = 3000) -> None:
148        logger.info("Shutting down recording backend")
149        with self._lock:
150            self._accepting_commands = False
151            thread = self._thread
152            worker_exit_error = self._worker_exit_error
153
154        if thread is None or not thread.is_alive():
155            self._finalize_shutdown()
156            if worker_exit_error is not None:
157                self._worker_exit_error = None
158                raise worker_exit_error
159            return
160
161        self._command_queue.put(self._shutdown_sentinel)
162        thread.join(max(0, timeout_ms) / 1000)
163
164        if thread.is_alive():
165            raise TimeoutError(f"recording shutdown timed out after {timeout_ms}ms")
166
167        self._finalize_shutdown()
168        worker_exit_error = self._take_worker_exit_error()
169        if worker_exit_error is not None:
170            raise worker_exit_error
171        logger.info("Recording backend shut down")
172
173    def _run(self) -> None:
174        exit_error: Exception | None = None
175        while True:
176            try:
177                if self._drain_commands():
178                    return
179
180                frame_stream = self._frame_stream
181                if frame_stream is None:
182                    time.sleep(0.01)
183                    continue
184
185                try:
186                    frame = frame_stream.read(timeout=0.05)
187                except queue.Empty:
188                    continue
189                except StreamClosedError:
190                    if not self._started:
191                        return
192                    logger.warning("Recording frame stream closed unexpectedly; reopening stream")
193                    self._frame_stream = self._open_frame_stream()
194                    continue
195                except FrameStreamOverflowError as exc:
196                    self._handle_frame_stream_overflow(exc.consumer_name)
197                    continue
198
199                if not self.is_recording:
200                    continue
201                self._on_frame_worker(frame)
202            except Exception as exc:
203                logger.exception("Recording worker exited with an unexpected error")
204                exit_error = exc
205                break
206
207        with self._lock:
208            self._worker_exit_error = exit_error
209            self._started = False
210            self._accepting_commands = False
211
212    def _drain_commands(self) -> bool:
213        while True:
214            try:
215                item = self._command_queue.get_nowait()
216            except queue.Empty:
217                return False
218
219            if item is self._shutdown_sentinel:
220                try:
221                    self._shutdown_worker()
222                finally:
223                    self._fail_pending_commands("ACQ_SHUTDOWN")
224                return True
225
226            command, _future = item
227            command()
228
229    def _on_frame_worker(self, frame: object) -> None:
230        if not isinstance(frame, FrameEnvelope):
231            return
232        if self._active_recording is None:
233            return
234
235        try:
236            append_recording_frame(
237                Path(self._active_recording.root_dir),
238                self._active_recording.recording_id,
239                frame,
240            )
241            self._active_recording.frame_counts_by_stream[frame.stream_id] += 1
242        except Exception:
243            logger.exception("Recording append failed; marking recording as failed")
244            self._fail_recording_worker("write_failed")
245
246    def _start_recording_worker(
247        self,
248        root_dir: str,
249        recordings_dir: str,
250        recording_label: object,
251        recording_descriptors: object,
252    ) -> RecordingStartSummary:
253        if self._active_recording is not None:
254            raise RuntimeError("ACQ_ALREADY_RECORDING")
255
256        if not isinstance(recording_descriptors, dict) or not all(
257            isinstance(value, StreamDescriptor) for value in recording_descriptors.values()
258        ):
259            raise RuntimeError("ACQ_INVALID_DESCRIPTOR_SNAPSHOT")
260
261        started_at_ns = time.time_ns()
262        try:
263            recording_id = create_recording(
264                Path(root_dir),
265                recording_descriptors,
266                recording_label=recording_label or None,
267            )
268        except Exception as exc:
269            self._active_recording = None
270            raise RuntimeError(f"ACQ_START_FAILED: {type(exc).__name__}: {exc}") from exc
271
272        recording_path = str(Path(recordings_dir) / recording_id)
273        self._active_recording = ActiveRecording(
274            root_dir=root_dir,
275            recording_id=recording_id,
276            recording_path=recording_path,
277            started_at_ns=started_at_ns,
278            frame_counts_by_stream={stream_id: 0 for stream_id in recording_descriptors},
279        )
280        self._set_state("recording")
281        logger.info("Started recording %s at %s", recording_id, recording_path)
282        return RecordingStartSummary(
283            recording_id=recording_id,
284            recording_path=recording_path,
285            started_at_ns=started_at_ns,
286        )
287
288    def _stop_recording_worker(self) -> RecordingStopSummary:
289        return self._stop_recording_worker_with_policy(publish_failure=True)
290
291    def _stop_recording_worker_with_policy(
292        self,
293        *,
294        publish_failure: bool,
295    ) -> RecordingStopSummary:
296        if self._active_recording is None:
297            raise RuntimeError("ACQ_STOP_IGNORED: not recording")
298
299        _ = publish_failure
300        stopped_at_ns = time.time_ns()
301        active_recording = self._active_recording
302        self._active_recording = None
303        self._set_state("idle")
304        logger.info("Stopped recording %s", active_recording.recording_id)
305        return RecordingStopSummary(
306            recording_id=active_recording.recording_id,
307            recording_path=active_recording.recording_path,
308            started_at_ns=active_recording.started_at_ns,
309            stopped_at_ns=stopped_at_ns,
310            status="completed",
311            frame_counts_by_stream=dict(active_recording.frame_counts_by_stream),
312        )
313
314    def _add_marker_worker(self, label: object) -> None:
315        if self._active_recording is None:
316            raise RuntimeError("ACQ_MARKER_REJECTED: not recording")
317
318        timestamp_ns = time.time_ns()
319        normalized_label = label or None
320        try:
321            add_recording_marker(
322                Path(self._active_recording.root_dir),
323                self._active_recording.recording_id,
324                timestamp_ns=timestamp_ns,
325                label=normalized_label,
326            )
327        except Exception as exc:
328            raise RuntimeError(f"ACQ_MARKER_FAILED: {type(exc).__name__}: {exc}") from exc
329
330    def _add_segment_worker(
331        self,
332        start_ns: object,
333        end_ns: object,
334        label: object,
335    ) -> None:
336        if self._active_recording is None:
337            raise RuntimeError("ACQ_SEGMENT_REJECTED: not recording")
338
339        try:
340            start_value = int(start_ns)
341            end_value = int(end_ns)
342        except (TypeError, ValueError):
343            raise RuntimeError("ACQ_INVALID_SEGMENT_RANGE")
344
345        if start_value > end_value:
346            raise RuntimeError("ACQ_INVALID_SEGMENT_RANGE: start_ns > end_ns")
347
348        normalized_label = label or None
349        try:
350            add_recording_segment(
351                Path(self._active_recording.root_dir),
352                self._active_recording.recording_id,
353                start_ns=start_value,
354                end_ns=end_value,
355                label=normalized_label,
356            )
357        except Exception as exc:
358            raise RuntimeError(f"ACQ_SEGMENT_FAILED: {type(exc).__name__}: {exc}") from exc
359
360    def _handle_frame_stream_overflow(self, consumer_name: str) -> None:
361        logger.warning("Recording frame stream overflowed for consumer '%s'", consumer_name)
362        if self._active_recording is not None:
363            self._fail_recording_worker("frame_stream_overflow")
364        self._replace_frame_stream()
365
366    def _replace_frame_stream(self) -> None:
367        previous_stream = self._frame_stream
368        self._frame_stream = self._open_frame_stream()
369        if previous_stream is not None:
370            previous_stream.close()
371
372    def _open_frame_stream(self) -> FrameStream:
373        return self._bus.open_frame_stream(
374            maxsize=256,
375            drop_policy="error",
376            consumer_name=RECORDING_CONSUMER_NAME,
377        )
378
379    def _shutdown_worker(self) -> None:
380        if self._active_recording is not None:
381            self._stop_recording_worker_with_policy(publish_failure=False)
382
383    def _fail_recording_worker(self, reason: str) -> None:
384        active_recording = self._active_recording
385        if active_recording is None:
386            return
387
388        stopped_at_ns = time.time_ns()
389        self._active_recording = None
390        self._set_state("idle")
391        self._publish_recording_failed(
392            active_recording,
393            stopped_at_ns=stopped_at_ns,
394            reason=reason,
395        )
396
397    def _publish_recording_failed(
398        self,
399        active_recording: ActiveRecording,
400        *,
401        stopped_at_ns: int,
402        reason: str,
403    ) -> None:
404        self._publish_event(
405            RecordingFailedEvent(
406                recording_id=active_recording.recording_id,
407                recording_path=active_recording.recording_path,
408                frame_counts_by_stream=dict(active_recording.frame_counts_by_stream),
409                reason=reason,
410                ts_ns=stopped_at_ns,
411            )
412        )
413
414    def _set_state(self, state: str) -> None:
415        with self._lock:
416            if state == self._state:
417                return
418            self._state = state
419
420    def _submit_command(
421        self,
422        worker_method: Callable[..., object],
423        *args: object,
424    ) -> Future[object]:
425        future: Future[object] = Future()
426        if not self.is_started:
427            future.set_exception(RuntimeError("ACQ_NOT_STARTED"))
428            return future
429        if not self._accepting_commands:
430            future.set_exception(RuntimeError("ACQ_SHUTTING_DOWN"))
431            return future
432
433        def _run_command() -> None:
434            try:
435                result = worker_method(*args)
436            except Exception as exc:
437                if not future.done():
438                    future.set_exception(exc)
439                return
440            if not future.done():
441                future.set_result(result)
442
443        self._command_queue.put((_run_command, future))
444        return future
445
446    def _fail_pending_commands(self, message: str) -> None:
447        while True:
448            try:
449                item = self._command_queue.get_nowait()
450            except queue.Empty:
451                return
452            if item is self._shutdown_sentinel:
453                continue
454            _command, future = item
455            if future is None or future.done():
456                continue
457            future.set_exception(RuntimeError(message))
458
459    def _finalize_shutdown(self) -> None:
460        self._set_state("idle")
461        self._started = False
462        self._accepting_commands = True
463        self._thread = None
464        if self._frame_stream is not None:
465            self._frame_stream.close()
466            self._frame_stream = None
467
468    def _take_worker_exit_error(self) -> Exception | None:
469        with self._lock:
470            error = self._worker_exit_error
471            self._worker_exit_error = None
472            return error

Threaded recording backend that persists bus frames to disk.

RecordingBackend( bus: StreamBus, *, settings: SettingsStore, publish_event: Callable[[BackendEvent], None], parent: object | None = None)
49    def __init__(
50        self,
51        bus: StreamBus,
52        *,
53        settings: SettingsStore,
54        publish_event: Callable[[BackendEvent], None],
55        parent: object | None = None,
56    ) -> None:
57        self._bus = bus
58        self._settings = settings
59        self._publish_event = publish_event
60        self._parent = parent
61        self._command_queue: queue.Queue[RecordingCommand | object] = queue.Queue()
62        self._shutdown_sentinel = object()
63        self._thread: threading.Thread | None = None
64        self._frame_stream: FrameStream | None = None
65        self._state = "idle"
66        self._started = False
67        self._accepting_commands = False
68        self._worker_exit_error: Exception | None = None
69        self._active_recording: ActiveRecording | None = None
70        self._lock = threading.RLock()
root_dir: pathlib._local.Path
72    @property
73    def root_dir(self) -> Path:
74        return resolved_storage_root_dir(self._settings)
is_started: bool
76    @property
77    def is_started(self) -> bool:
78        thread = self._thread
79        return self._started and thread is not None and thread.is_alive()
state: str
81    @property
82    def state(self) -> str:
83        return self._state
is_recording: bool
85    @property
86    def is_recording(self) -> bool:
87        return self._state == "recording"
def snapshot(self) -> RecordingSnapshot:
89    def snapshot(self) -> RecordingSnapshot:
90        return RecordingSnapshot(
91            state=self._state,
92            is_started=self.is_started,
93            is_recording=self.is_recording,
94            root_dir=str(self.root_dir),
95        )
def start(self) -> None:
 97    def start(self) -> None:
 98        if self.is_started:
 99            self._started = True
100            self._accepting_commands = True
101            return
102        logger.info("Starting recording backend")
103        if self._frame_stream is None or self._frame_stream.closed:
104            self._frame_stream = self._open_frame_stream()
105        thread = threading.Thread(
106            target=self._run,
107            name="modlink.recording",
108            daemon=True,
109        )
110        self._thread = thread
111        self._worker_exit_error = None
112        self._accepting_commands = True
113        self._started = True
114        thread.start()
115        logger.info("Recording backend started")
def start_recording( self, recording_label: str | None = None) -> concurrent.futures._base.Future[RecordingStartSummary]:
117    def start_recording(self, recording_label: str | None = None) -> Future[RecordingStartSummary]:
118        storage_root_dir = self.root_dir
119        recordings_root_dir = storage_root_dir / "recordings"
120        return self._submit_command(
121            self._start_recording_worker,
122            str(storage_root_dir),
123            str(recordings_root_dir),
124            recording_label,
125            self._bus.descriptors(),
126        )
def stop_recording( self) -> concurrent.futures._base.Future[RecordingStopSummary]:
128    def stop_recording(self) -> Future[RecordingStopSummary]:
129        return self._submit_command(self._stop_recording_worker)
def add_marker(self, label: str | None = None) -> concurrent.futures._base.Future[None]:
131    def add_marker(self, label: str | None = None) -> Future[None]:
132        return self._submit_command(self._add_marker_worker, label)
def add_segment( self, start_ns: int, end_ns: int, label: str | None = None) -> concurrent.futures._base.Future[None]:
134    def add_segment(
135        self,
136        start_ns: int,
137        end_ns: int,
138        label: str | None = None,
139    ) -> Future[None]:
140        return self._submit_command(
141            self._add_segment_worker,
142            start_ns,
143            end_ns,
144            label,
145        )
def shutdown(self, *, timeout_ms: int = 3000) -> None:
147    def shutdown(self, *, timeout_ms: int = 3000) -> None:
148        logger.info("Shutting down recording backend")
149        with self._lock:
150            self._accepting_commands = False
151            thread = self._thread
152            worker_exit_error = self._worker_exit_error
153
154        if thread is None or not thread.is_alive():
155            self._finalize_shutdown()
156            if worker_exit_error is not None:
157                self._worker_exit_error = None
158                raise worker_exit_error
159            return
160
161        self._command_queue.put(self._shutdown_sentinel)
162        thread.join(max(0, timeout_ms) / 1000)
163
164        if thread.is_alive():
165            raise TimeoutError(f"recording shutdown timed out after {timeout_ms}ms")
166
167        self._finalize_shutdown()
168        worker_exit_error = self._take_worker_exit_error()
169        if worker_exit_error is not None:
170            raise worker_exit_error
171        logger.info("Recording backend shut down")
@dataclass(frozen=True, slots=True)
class RecordingSnapshot:
17@dataclass(frozen=True, slots=True)
18class RecordingSnapshot:
19    state: str
20    is_started: bool
21    is_recording: bool
22    root_dir: str
RecordingSnapshot(state: str, is_started: bool, is_recording: bool, root_dir: str)
state: str
is_started: bool
is_recording: bool
root_dir: str
@dataclass(frozen=True, slots=True)
class RecordingStartSummary:
25@dataclass(frozen=True, slots=True)
26class RecordingStartSummary:
27    recording_id: str
28    recording_path: str
29    started_at_ns: int
RecordingStartSummary(recording_id: str, recording_path: str, started_at_ns: int)
recording_id: str
recording_path: str
started_at_ns: int
@dataclass(frozen=True, slots=True)
class RecordingStopSummary:
32@dataclass(frozen=True, slots=True)
33class RecordingStopSummary:
34    recording_id: str
35    recording_path: str
36    started_at_ns: int
37    stopped_at_ns: int
38    status: str
39    frame_counts_by_stream: dict[str, int]
RecordingStopSummary( recording_id: str, recording_path: str, started_at_ns: int, stopped_at_ns: int, status: str, frame_counts_by_stream: dict[str, int])
recording_id: str
recording_path: str
started_at_ns: int
stopped_at_ns: int
status: str
frame_counts_by_stream: dict[str, int]
class ReplayBackend:
 31class ReplayBackend:
 32    def __init__(
 33        self,
 34        *,
 35        settings: SettingsStore,
 36        parent: object | None = None,
 37    ) -> None:
 38        self._settings = settings
 39        self._parent = parent
 40        self.bus = StreamBus(event_broker=BackendEventBroker(), parent=self)
 41        self._export_service = ExportService()
 42        self._command_queue: queue.Queue[ReplayCommand | object] = queue.Queue()
 43        self._shutdown_sentinel = object()
 44        self._thread: threading.Thread | None = None
 45        self._state = "idle"
 46        self._started = False
 47        self._accepting_commands = False
 48        self._worker_exit_error: Exception | None = None
 49        self._lock = threading.RLock()
 50        self._recordings: tuple[ReplayRecordingSummary, ...] = ()
 51        self._reader: RecordingReader | None = None
 52        self._markers: tuple[ReplayMarker, ...] = ()
 53        self._segments: tuple[ReplaySegment, ...] = ()
 54        self._position_ns = 0
 55        self._speed_multiplier = 1.0
 56        self._timeline_index = 0
 57        self._play_started_wall_ns = 0
 58        self._play_started_position_ns = 0
 59
 60    @property
 61    def is_started(self) -> bool:
 62        thread = self._thread
 63        return self._started and thread is not None and thread.is_alive()
 64
 65    def start(self) -> None:
 66        if self.is_started:
 67            self._started = True
 68            self._accepting_commands = True
 69            return
 70        logger.info("Starting replay backend")
 71        self._export_service.start()
 72        thread = threading.Thread(
 73            target=self._run,
 74            name="modlink.replay",
 75            daemon=True,
 76        )
 77        self._thread = thread
 78        self._worker_exit_error = None
 79        self._accepting_commands = True
 80        self._started = True
 81        thread.start()
 82        self.refresh_recordings()
 83
 84    def shutdown(self, *, timeout_ms: int = 3000) -> None:
 85        logger.info("Shutting down replay backend")
 86        with self._lock:
 87            self._accepting_commands = False
 88            thread = self._thread
 89            worker_exit_error = self._worker_exit_error
 90
 91        if thread is None or not thread.is_alive():
 92            self._finalize_shutdown()
 93            self._export_service.shutdown(timeout_ms=timeout_ms)
 94            if worker_exit_error is not None:
 95                self._worker_exit_error = None
 96                raise worker_exit_error
 97            return
 98
 99        self._command_queue.put(self._shutdown_sentinel)
100        thread.join(max(0, timeout_ms) / 1000)
101        if thread.is_alive():
102            raise TimeoutError(f"replay shutdown timed out after {timeout_ms}ms")
103
104        self._finalize_shutdown()
105        self._export_service.shutdown(timeout_ms=timeout_ms)
106        worker_exit_error = self._take_worker_exit_error()
107        if worker_exit_error is not None:
108            raise worker_exit_error
109
110    def snapshot(self) -> ReplaySnapshot:
111        reader = self._reader
112        return ReplaySnapshot(
113            state=self._state,
114            is_started=self.is_started,
115            recording_id=None if reader is None else reader.recording_id,
116            recording_path=None if reader is None else str(reader.recording_path),
117            position_ns=int(self._position_ns),
118            duration_ns=0 if reader is None else int(reader.duration_ns),
119            speed_multiplier=float(self._speed_multiplier),
120        )
121
122    def recordings(self) -> tuple[ReplayRecordingSummary, ...]:
123        return self._recordings
124
125    def markers(self) -> tuple[ReplayMarker, ...]:
126        return self._markers
127
128    def segments(self) -> tuple[ReplaySegment, ...]:
129        return self._segments
130
131    def export_jobs(self) -> tuple[ExportJobSnapshot, ...]:
132        return self._export_service.jobs()
133
134    def refresh_recordings(self) -> Future[tuple[ReplayRecordingSummary, ...]]:
135        return self._submit_command(self._refresh_recordings_worker)
136
137    def open_recording(self, recording_path: str | Path) -> Future[ReplaySnapshot]:
138        return self._submit_command(self._open_recording_worker, recording_path)
139
140    def play(self) -> Future[None]:
141        return self._submit_command(self._play_worker)
142
143    def pause(self) -> Future[None]:
144        return self._submit_command(self._pause_worker)
145
146    def stop(self) -> Future[None]:
147        return self._submit_command(self._stop_worker)
148
149    def set_speed(self, multiplier: float) -> Future[float]:
150        return self._submit_command(self._set_speed_worker, multiplier)
151
152    def start_export(self, format_id: str) -> Future[ExportJobSnapshot]:
153        return self._submit_command(self._start_export_worker, format_id)
154
155    def _run(self) -> None:
156        exit_error: Exception | None = None
157        while True:
158            try:
159                if self._drain_commands():
160                    return
161                if self._state != "playing":
162                    time.sleep(0.01)
163                    continue
164                self._tick_playback()
165            except Exception as exc:
166                logger.exception("Replay worker exited with an unexpected error")
167                exit_error = exc
168                break
169
170        with self._lock:
171            self._worker_exit_error = exit_error
172            self._started = False
173            self._accepting_commands = False
174
175    def _drain_commands(self) -> bool:
176        while True:
177            try:
178                item = self._command_queue.get_nowait()
179            except queue.Empty:
180                return False
181
182            if item is self._shutdown_sentinel:
183                return True
184
185            command, _future = item
186            command()
187
188    def _refresh_recordings_worker(self) -> tuple[ReplayRecordingSummary, ...]:
189        root_dir = resolved_storage_root_dir(self._settings)
190        summaries: list[ReplayRecordingSummary] = []
191        for manifest in list_recordings(root_dir):
192            recording_id = manifest.get("recording_id")
193            if not isinstance(recording_id, str) or recording_id == "":
194                continue
195            stream_ids_payload = manifest.get("stream_ids", [])
196            stream_ids = tuple(
197                stream_id for stream_id in stream_ids_payload if isinstance(stream_id, str)
198            )
199            label = manifest.get("recording_label")
200            summaries.append(
201                ReplayRecordingSummary(
202                    recording_id=recording_id,
203                    recording_label=label
204                    if isinstance(label, str) or label is None
205                    else str(label),
206                    recording_path=str(root_dir / "recordings" / recording_id),
207                    stream_ids=stream_ids,
208                )
209            )
210        self._recordings = tuple(summaries)
211        return self._recordings
212
213    def _open_recording_worker(self, recording_path: str | Path) -> ReplaySnapshot:
214        logger.info("Opening replay recording from %s", recording_path)
215        reader = RecordingReader(recording_path)
216        self._reader = reader
217        self._markers = reader.markers()
218        self._segments = reader.segments()
219        self._position_ns = 0
220        self._speed_multiplier = 1.0
221        self._timeline_index = 0
222        self._play_started_wall_ns = 0
223        self._play_started_position_ns = 0
224        self._rebuild_bus(reader)
225        self._set_state("ready")
226        logger.debug(
227            "Replay recording opened: recording_id=%s duration_ns=%s stream_count=%s",
228            reader.recording_id,
229            reader.duration_ns,
230            len(reader.descriptors()),
231        )
232        return self.snapshot()
233
234    def _play_worker(self) -> None:
235        reader = self._require_reader()
236        if not reader.frames():
237            raise RuntimeError("REPLAY_EMPTY_RECORDING")
238        if self._state == "playing":
239            return
240        if self._state == "finished":
241            self._position_ns = 0
242            self._timeline_index = 0
243        self._play_started_wall_ns = time.monotonic_ns()
244        self._play_started_position_ns = self._position_ns
245        self._set_state("playing")
246
247    def _pause_worker(self) -> None:
248        if self._state != "playing":
249            raise RuntimeError("REPLAY_NOT_PLAYING")
250        self._sync_playback_position(time.monotonic_ns())
251        self._set_state("paused")
252
253    def _stop_worker(self) -> None:
254        self._require_reader()
255        self._position_ns = 0
256        self._timeline_index = 0
257        self._play_started_wall_ns = 0
258        self._play_started_position_ns = 0
259        self._set_state("ready")
260
261    def _set_speed_worker(self, multiplier: object) -> float:
262        try:
263            resolved = float(multiplier)
264        except (TypeError, ValueError) as exc:
265            raise RuntimeError("REPLAY_INVALID_SPEED") from exc
266        if resolved not in {1.0, 2.0, 4.0}:
267            raise RuntimeError("REPLAY_INVALID_SPEED")
268        if self._state == "playing":
269            current_time_ns = time.monotonic_ns()
270            self._sync_playback_position(current_time_ns)
271            self._play_started_wall_ns = current_time_ns
272            self._play_started_position_ns = self._position_ns
273        self._speed_multiplier = resolved
274        return self._speed_multiplier
275
276    def _start_export_worker(self, format_id: object) -> ExportJobSnapshot:
277        reader = self._require_reader()
278        if not isinstance(format_id, str) or format_id.strip() == "":
279            raise RuntimeError("REPLAY_EXPORT_FORMAT_UNSUPPORTED")
280        export_root_dir = resolved_export_root_dir(self._settings)
281        return self._export_service.enqueue(reader, format_id, export_root_dir)
282
283    def _tick_playback(self) -> None:
284        current_time_ns = time.monotonic_ns()
285        target_position_ns = self._sync_playback_position(current_time_ns)
286        reader = self._reader
287        if reader is None:
288            return
289        if (
290            self._timeline_index >= len(reader.frames())
291            and target_position_ns >= reader.duration_ns
292        ):
293            self._position_ns = reader.duration_ns
294            self._set_state("finished")
295            return
296        time.sleep(0.005)
297
298    def _sync_playback_position(self, current_time_ns: int) -> int:
299        reader = self._reader
300        if reader is None:
301            return self._position_ns
302        target_position_ns = min(
303            reader.duration_ns,
304            int(
305                self._play_started_position_ns
306                + (current_time_ns - self._play_started_wall_ns) * self._speed_multiplier
307            ),
308        )
309        self._emit_due_frames(target_position_ns)
310        self._position_ns = target_position_ns
311        return target_position_ns
312
313    def _emit_due_frames(self, target_position_ns: int) -> None:
314        reader = self._reader
315        if reader is None:
316            return
317        timeline = reader.frames()
318        while self._timeline_index < len(timeline):
319            ref = timeline[self._timeline_index]
320            if ref.relative_timestamp_ns > target_position_ns:
321                break
322            self.bus.ingest_frame(reader.load_frame(ref))
323            self._timeline_index += 1
324
325    def _rebuild_bus(self, reader: RecordingReader) -> None:
326        for stream_id in tuple(self.bus.descriptors()):
327            self.bus.remove_descriptor(stream_id)
328        self.bus.add_descriptors(reader.descriptors().values())
329
330    def _set_state(self, state: str) -> None:
331        with self._lock:
332            if self._state == state:
333                return
334            self._state = state
335
336    def _require_reader(self) -> RecordingReader:
337        if self._reader is None:
338            raise RuntimeError("REPLAY_NOT_OPEN")
339        return self._reader
340
341    def _submit_command(
342        self,
343        worker_method: Callable[..., object],
344        *args: object,
345    ) -> Future[object]:
346        future: Future[object] = Future()
347        if not self.is_started:
348            future.set_exception(RuntimeError("REPLAY_NOT_STARTED"))
349            return future
350        if not self._accepting_commands:
351            future.set_exception(RuntimeError("REPLAY_SHUTTING_DOWN"))
352            return future
353
354        def _run_command() -> None:
355            try:
356                result = worker_method(*args)
357            except Exception as exc:
358                if not future.done():
359                    future.set_exception(exc)
360                return
361            if not future.done():
362                future.set_result(result)
363
364        self._command_queue.put((_run_command, future))
365        return future
366
367    def _finalize_shutdown(self) -> None:
368        self._set_state("idle")
369        self._started = False
370        self._accepting_commands = True
371        self._thread = None
372
373    def _take_worker_exit_error(self) -> Exception | None:
374        with self._lock:
375            error = self._worker_exit_error
376            self._worker_exit_error = None
377            return error
ReplayBackend( *, settings: SettingsStore, parent: object | None = None)
32    def __init__(
33        self,
34        *,
35        settings: SettingsStore,
36        parent: object | None = None,
37    ) -> None:
38        self._settings = settings
39        self._parent = parent
40        self.bus = StreamBus(event_broker=BackendEventBroker(), parent=self)
41        self._export_service = ExportService()
42        self._command_queue: queue.Queue[ReplayCommand | object] = queue.Queue()
43        self._shutdown_sentinel = object()
44        self._thread: threading.Thread | None = None
45        self._state = "idle"
46        self._started = False
47        self._accepting_commands = False
48        self._worker_exit_error: Exception | None = None
49        self._lock = threading.RLock()
50        self._recordings: tuple[ReplayRecordingSummary, ...] = ()
51        self._reader: RecordingReader | None = None
52        self._markers: tuple[ReplayMarker, ...] = ()
53        self._segments: tuple[ReplaySegment, ...] = ()
54        self._position_ns = 0
55        self._speed_multiplier = 1.0
56        self._timeline_index = 0
57        self._play_started_wall_ns = 0
58        self._play_started_position_ns = 0
bus
is_started: bool
60    @property
61    def is_started(self) -> bool:
62        thread = self._thread
63        return self._started and thread is not None and thread.is_alive()
def start(self) -> None:
65    def start(self) -> None:
66        if self.is_started:
67            self._started = True
68            self._accepting_commands = True
69            return
70        logger.info("Starting replay backend")
71        self._export_service.start()
72        thread = threading.Thread(
73            target=self._run,
74            name="modlink.replay",
75            daemon=True,
76        )
77        self._thread = thread
78        self._worker_exit_error = None
79        self._accepting_commands = True
80        self._started = True
81        thread.start()
82        self.refresh_recordings()
def shutdown(self, *, timeout_ms: int = 3000) -> None:
 84    def shutdown(self, *, timeout_ms: int = 3000) -> None:
 85        logger.info("Shutting down replay backend")
 86        with self._lock:
 87            self._accepting_commands = False
 88            thread = self._thread
 89            worker_exit_error = self._worker_exit_error
 90
 91        if thread is None or not thread.is_alive():
 92            self._finalize_shutdown()
 93            self._export_service.shutdown(timeout_ms=timeout_ms)
 94            if worker_exit_error is not None:
 95                self._worker_exit_error = None
 96                raise worker_exit_error
 97            return
 98
 99        self._command_queue.put(self._shutdown_sentinel)
100        thread.join(max(0, timeout_ms) / 1000)
101        if thread.is_alive():
102            raise TimeoutError(f"replay shutdown timed out after {timeout_ms}ms")
103
104        self._finalize_shutdown()
105        self._export_service.shutdown(timeout_ms=timeout_ms)
106        worker_exit_error = self._take_worker_exit_error()
107        if worker_exit_error is not None:
108            raise worker_exit_error
def snapshot(self) -> ReplaySnapshot:
110    def snapshot(self) -> ReplaySnapshot:
111        reader = self._reader
112        return ReplaySnapshot(
113            state=self._state,
114            is_started=self.is_started,
115            recording_id=None if reader is None else reader.recording_id,
116            recording_path=None if reader is None else str(reader.recording_path),
117            position_ns=int(self._position_ns),
118            duration_ns=0 if reader is None else int(reader.duration_ns),
119            speed_multiplier=float(self._speed_multiplier),
120        )
def recordings(self) -> tuple[ReplayRecordingSummary, ...]:
122    def recordings(self) -> tuple[ReplayRecordingSummary, ...]:
123        return self._recordings
def markers(self) -> tuple[ReplayMarker, ...]:
125    def markers(self) -> tuple[ReplayMarker, ...]:
126        return self._markers
def segments(self) -> tuple[ReplaySegment, ...]:
128    def segments(self) -> tuple[ReplaySegment, ...]:
129        return self._segments
def export_jobs(self) -> tuple[ExportJobSnapshot, ...]:
131    def export_jobs(self) -> tuple[ExportJobSnapshot, ...]:
132        return self._export_service.jobs()
def refresh_recordings( self) -> concurrent.futures._base.Future[tuple[ReplayRecordingSummary, ...]]:
134    def refresh_recordings(self) -> Future[tuple[ReplayRecordingSummary, ...]]:
135        return self._submit_command(self._refresh_recordings_worker)
def open_recording( self, recording_path: str | pathlib._local.Path) -> concurrent.futures._base.Future[ReplaySnapshot]:
137    def open_recording(self, recording_path: str | Path) -> Future[ReplaySnapshot]:
138        return self._submit_command(self._open_recording_worker, recording_path)
def play(self) -> concurrent.futures._base.Future[None]:
140    def play(self) -> Future[None]:
141        return self._submit_command(self._play_worker)
def pause(self) -> concurrent.futures._base.Future[None]:
143    def pause(self) -> Future[None]:
144        return self._submit_command(self._pause_worker)
def stop(self) -> concurrent.futures._base.Future[None]:
146    def stop(self) -> Future[None]:
147        return self._submit_command(self._stop_worker)
def set_speed(self, multiplier: float) -> concurrent.futures._base.Future[float]:
149    def set_speed(self, multiplier: float) -> Future[float]:
150        return self._submit_command(self._set_speed_worker, multiplier)
def start_export( self, format_id: str) -> concurrent.futures._base.Future[ExportJobSnapshot]:
152    def start_export(self, format_id: str) -> Future[ExportJobSnapshot]:
153        return self._submit_command(self._start_export_worker, format_id)
@dataclass(frozen=True, slots=True)
class ReplayMarker:
61@dataclass(frozen=True, slots=True)
62class ReplayMarker:
63    timestamp_ns: int
64    label: str | None
ReplayMarker(timestamp_ns: int, label: str | None)
timestamp_ns: int
label: str | None
@dataclass(frozen=True, slots=True)
class ReplayRecordingSummary:
53@dataclass(frozen=True, slots=True)
54class ReplayRecordingSummary:
55    recording_id: str
56    recording_label: str | None
57    recording_path: str
58    stream_ids: tuple[str, ...]
ReplayRecordingSummary( recording_id: str, recording_label: str | None, recording_path: str, stream_ids: tuple[str, ...])
recording_id: str
recording_label: str | None
recording_path: str
stream_ids: tuple[str, ...]
@dataclass(frozen=True, slots=True)
class ReplaySegment:
67@dataclass(frozen=True, slots=True)
68class ReplaySegment:
69    start_ns: int
70    end_ns: int
71    label: str | None
ReplaySegment(start_ns: int, end_ns: int, label: str | None)
start_ns: int
end_ns: int
label: str | None
@dataclass(frozen=True, slots=True)
class ReplaySnapshot:
42@dataclass(frozen=True, slots=True)
43class ReplaySnapshot:
44    state: str
45    is_started: bool
46    recording_id: str | None
47    recording_path: str | None
48    position_ns: int
49    duration_ns: int
50    speed_multiplier: float
ReplaySnapshot( state: str, is_started: bool, recording_id: str | None, recording_path: str | None, position_ns: int, duration_ns: int, speed_multiplier: float)
state: str
is_started: bool
recording_id: str | None
recording_path: str | None
position_ns: int
duration_ns: int
speed_multiplier: float
@dataclass(frozen=True, slots=True)
class ExportJobSnapshot:
74@dataclass(frozen=True, slots=True)
75class ExportJobSnapshot:
76    job_id: str
77    recording_id: str
78    format_id: str
79    state: str
80    progress: float
81    output_path: str | None
82    error: str | None
ExportJobSnapshot( job_id: str, recording_id: str, format_id: str, state: str, progress: float, output_path: str | None, error: str | None)
job_id: str
recording_id: str
format_id: str
state: str
progress: float
output_path: str | None
error: str | None
class BackendEventBroker:
119class BackendEventBroker:
120    def __init__(self) -> None:
121        self._streams: list[EventStream] = []
122        self._lock = RLock()
123
124    def open_stream(self, *, maxsize: int = 1024) -> EventStream:
125        stream = EventStream(self, maxsize=maxsize)
126        with self._lock:
127            self._streams.append(stream)
128        return stream
129
130    def publish(self, event: BackendEvent) -> None:
131        with self._lock:
132            streams = tuple(self._streams)
133        for stream in streams:
134            stream._publish(event)
135
136    def _close_stream(self, stream: EventStream) -> None:
137        with self._lock:
138            try:
139                self._streams.remove(stream)
140            except ValueError:
141                pass
142        stream._push_closed()
def open_stream(self, *, maxsize: int = 1024) -> EventStream:
124    def open_stream(self, *, maxsize: int = 1024) -> EventStream:
125        stream = EventStream(self, maxsize=maxsize)
126        with self._lock:
127            self._streams.append(stream)
128        return stream
def publish(self, event: BackendEvent) -> None:
130    def publish(self, event: BackendEvent) -> None:
131        with self._lock:
132            streams = tuple(self._streams)
133        for stream in streams:
134            stream._publish(event)
@dataclass(frozen=True, slots=True)
class DriverConnectionLostEvent:
 8@dataclass(frozen=True, slots=True)
 9class DriverConnectionLostEvent:
10    driver_id: str
11    detail: object | None = None
12    kind: Literal["driver_connection_lost"] = "driver_connection_lost"
DriverConnectionLostEvent( driver_id: str, detail: object | None = None, kind: Literal['driver_connection_lost'] = 'driver_connection_lost')
driver_id: str
detail: object | None
kind: Literal['driver_connection_lost']
@dataclass(frozen=True, slots=True)
class DriverExecutorFailedEvent:
15@dataclass(frozen=True, slots=True)
16class DriverExecutorFailedEvent:
17    driver_id: str
18    detail: object
19    kind: Literal["driver_executor_failed"] = "driver_executor_failed"
DriverExecutorFailedEvent( driver_id: str, detail: object, kind: Literal['driver_executor_failed'] = 'driver_executor_failed')
driver_id: str
detail: object
kind: Literal['driver_executor_failed']
class EventStream:
 22class EventStream:
 23    def __init__(
 24        self,
 25        broker: BackendEventBroker,
 26        *,
 27        maxsize: int = 1024,
 28    ) -> None:
 29        normalized_maxsize = max(1, int(maxsize))
 30        self._broker = broker
 31        self._queue: queue.Queue[BackendEvent | object] = queue.Queue(maxsize=normalized_maxsize)
 32        self._lock = RLock()
 33        self._closed = False
 34
 35    @property
 36    def closed(self) -> bool:
 37        return self._closed
 38
 39    def read(
 40        self,
 41        *,
 42        block: bool = True,
 43        timeout: float | None = None,
 44    ) -> BackendEvent:
 45        item = self._queue.get(block=block, timeout=timeout)
 46        if item is _STREAM_CLOSED:
 47            raise StreamClosedError("event stream is closed")
 48        if item is _STREAM_OVERFLOW:
 49            raise EventStreamOverflowError("event stream overflowed")
 50        return item  # type: ignore[return-value]
 51
 52    def read_many(self, *, max_items: int | None = None) -> list[BackendEvent]:
 53        if max_items is not None and max_items <= 0:
 54            return []
 55
 56        items: list[BackendEvent] = []
 57        while max_items is None or len(items) < max_items:
 58            try:
 59                item = self._queue.get_nowait()
 60            except queue.Empty:
 61                break
 62            if item is _STREAM_CLOSED:
 63                if items:
 64                    return items
 65                raise StreamClosedError("event stream is closed")
 66            if item is _STREAM_OVERFLOW:
 67                if items:
 68                    return items
 69                raise EventStreamOverflowError("event stream overflowed")
 70            items.append(item)  # type: ignore[arg-type]
 71        return items
 72
 73    def close(self) -> None:
 74        with self._lock:
 75            if self._closed:
 76                return
 77            self._closed = True
 78        self._broker._close_stream(self)
 79
 80    def _publish(self, event: BackendEvent) -> None:
 81        with self._lock:
 82            if self._closed:
 83                return
 84            try:
 85                self._queue.put_nowait(event)
 86                return
 87            except queue.Full:
 88                pass
 89
 90            try:
 91                self._queue.get_nowait()
 92            except queue.Empty:
 93                pass
 94
 95            try:
 96                self._queue.put_nowait(_STREAM_OVERFLOW)
 97            except queue.Full:
 98                pass
 99
100    def _push_closed(self) -> None:
101        with self._lock:
102            try:
103                self._queue.put_nowait(_STREAM_CLOSED)
104                return
105            except queue.Full:
106                pass
107
108            try:
109                self._queue.get_nowait()
110            except queue.Empty:
111                pass
112
113            try:
114                self._queue.put_nowait(_STREAM_CLOSED)
115            except queue.Full:
116                pass
EventStream( broker: BackendEventBroker, *, maxsize: int = 1024)
23    def __init__(
24        self,
25        broker: BackendEventBroker,
26        *,
27        maxsize: int = 1024,
28    ) -> None:
29        normalized_maxsize = max(1, int(maxsize))
30        self._broker = broker
31        self._queue: queue.Queue[BackendEvent | object] = queue.Queue(maxsize=normalized_maxsize)
32        self._lock = RLock()
33        self._closed = False
closed: bool
35    @property
36    def closed(self) -> bool:
37        return self._closed
def read( self, *, block: bool = True, timeout: float | None = None) -> BackendEvent:
39    def read(
40        self,
41        *,
42        block: bool = True,
43        timeout: float | None = None,
44    ) -> BackendEvent:
45        item = self._queue.get(block=block, timeout=timeout)
46        if item is _STREAM_CLOSED:
47            raise StreamClosedError("event stream is closed")
48        if item is _STREAM_OVERFLOW:
49            raise EventStreamOverflowError("event stream overflowed")
50        return item  # type: ignore[return-value]
def read_many(self, *, max_items: int | None = None) -> list[BackendEvent]:
52    def read_many(self, *, max_items: int | None = None) -> list[BackendEvent]:
53        if max_items is not None and max_items <= 0:
54            return []
55
56        items: list[BackendEvent] = []
57        while max_items is None or len(items) < max_items:
58            try:
59                item = self._queue.get_nowait()
60            except queue.Empty:
61                break
62            if item is _STREAM_CLOSED:
63                if items:
64                    return items
65                raise StreamClosedError("event stream is closed")
66            if item is _STREAM_OVERFLOW:
67                if items:
68                    return items
69                raise EventStreamOverflowError("event stream overflowed")
70            items.append(item)  # type: ignore[arg-type]
71        return items
def close(self) -> None:
73    def close(self) -> None:
74        with self._lock:
75            if self._closed:
76                return
77            self._closed = True
78        self._broker._close_stream(self)
class EventStreamOverflowError(builtins.Exception):
14class EventStreamOverflowError(Exception):
15    """Raised when the low-frequency event stream overflows."""

Raised when the low-frequency event stream overflows.

class FrameStream:
 26class FrameStream:
 27    def __init__(
 28        self,
 29        bus: StreamBus,
 30        *,
 31        maxsize: int = 256,
 32        drop_policy: FrameDropPolicy = "drop_oldest",
 33        consumer_name: str,
 34    ) -> None:
 35        if drop_policy not in {"drop_oldest", "error"}:
 36            raise ValueError(f"unsupported frame drop policy: {drop_policy}")
 37        normalized_maxsize = max(1, int(maxsize))
 38        self._bus = bus
 39        self._queue: queue.Queue[FrameEnvelope | object] = queue.Queue(maxsize=normalized_maxsize)
 40        self._drop_policy = drop_policy
 41        self._consumer_name = consumer_name
 42        self._dropped_count = 0
 43        self._closed = False
 44        self._overflowed = False
 45        self._lock = RLock()
 46
 47    @property
 48    def consumer_name(self) -> str:
 49        return self._consumer_name
 50
 51    @property
 52    def dropped_count(self) -> int:
 53        return self._dropped_count
 54
 55    @property
 56    def overflowed(self) -> bool:
 57        return self._overflowed
 58
 59    @property
 60    def closed(self) -> bool:
 61        return self._closed
 62
 63    def read(
 64        self,
 65        *,
 66        block: bool = True,
 67        timeout: float | None = None,
 68    ) -> FrameEnvelope:
 69        item = self._queue.get(block=block, timeout=timeout)
 70        if item is _FRAME_STREAM_CLOSED:
 71            raise StreamClosedError("frame stream is closed")
 72        if item is _FRAME_STREAM_OVERFLOW:
 73            raise FrameStreamOverflowError(self._consumer_name)
 74        return item  # type: ignore[return-value]
 75
 76    def read_many(self, *, max_items: int | None = None) -> list[FrameEnvelope]:
 77        if max_items is not None and max_items <= 0:
 78            return []
 79
 80        items: list[FrameEnvelope] = []
 81        while max_items is None or len(items) < max_items:
 82            try:
 83                item = self._queue.get_nowait()
 84            except queue.Empty:
 85                break
 86            if item is _FRAME_STREAM_CLOSED:
 87                if items:
 88                    return items
 89                raise StreamClosedError("frame stream is closed")
 90            if item is _FRAME_STREAM_OVERFLOW:
 91                if items:
 92                    return items
 93                raise FrameStreamOverflowError(self._consumer_name)
 94            items.append(item)  # type: ignore[arg-type]
 95        return items
 96
 97    def close(self) -> None:
 98        with self._lock:
 99            if self._closed:
100                return
101            self._closed = True
102        self._bus._close_frame_stream(self)
103
104    def _publish(self, frame: FrameEnvelope) -> None:
105        with self._lock:
106            if self._closed:
107                return
108            if self._overflowed and self._drop_policy == "error":
109                self._dropped_count += 1
110                return
111            try:
112                self._queue.put_nowait(frame)
113                return
114            except queue.Full:
115                self._dropped_count += 1
116
117                if self._drop_policy == "drop_oldest":
118                    self._drop_oldest_and_enqueue(frame)
119                    return
120
121                self._overflowed = True
122                self._clear_queue_locked()
123                self._push_control_locked(_FRAME_STREAM_OVERFLOW)
124
125    def _drop_oldest_and_enqueue(self, frame: FrameEnvelope) -> None:
126        try:
127            self._queue.get_nowait()
128        except queue.Empty:
129            pass
130        try:
131            self._queue.put_nowait(frame)
132        except queue.Full:
133            pass
134
135    def _clear_queue_locked(self) -> None:
136        while True:
137            try:
138                self._queue.get_nowait()
139            except queue.Empty:
140                break
141
142    def _push_closed(self) -> None:
143        with self._lock:
144            self._push_control_locked(_FRAME_STREAM_CLOSED)
145
146    def _push_control_locked(self, item: object) -> None:
147        try:
148            self._queue.put_nowait(item)
149            return
150        except queue.Full:
151            pass
152
153        try:
154            self._queue.get_nowait()
155        except queue.Empty:
156            pass
157
158        try:
159            self._queue.put_nowait(item)
160        except queue.Full:
161            pass
FrameStream( bus: StreamBus, *, maxsize: int = 256, drop_policy: Literal['drop_oldest', 'error'] = 'drop_oldest', consumer_name: str)
27    def __init__(
28        self,
29        bus: StreamBus,
30        *,
31        maxsize: int = 256,
32        drop_policy: FrameDropPolicy = "drop_oldest",
33        consumer_name: str,
34    ) -> None:
35        if drop_policy not in {"drop_oldest", "error"}:
36            raise ValueError(f"unsupported frame drop policy: {drop_policy}")
37        normalized_maxsize = max(1, int(maxsize))
38        self._bus = bus
39        self._queue: queue.Queue[FrameEnvelope | object] = queue.Queue(maxsize=normalized_maxsize)
40        self._drop_policy = drop_policy
41        self._consumer_name = consumer_name
42        self._dropped_count = 0
43        self._closed = False
44        self._overflowed = False
45        self._lock = RLock()
consumer_name: str
47    @property
48    def consumer_name(self) -> str:
49        return self._consumer_name
dropped_count: int
51    @property
52    def dropped_count(self) -> int:
53        return self._dropped_count
overflowed: bool
55    @property
56    def overflowed(self) -> bool:
57        return self._overflowed
closed: bool
59    @property
60    def closed(self) -> bool:
61        return self._closed
def read( self, *, block: bool = True, timeout: float | None = None) -> modlink_sdk.FrameEnvelope:
63    def read(
64        self,
65        *,
66        block: bool = True,
67        timeout: float | None = None,
68    ) -> FrameEnvelope:
69        item = self._queue.get(block=block, timeout=timeout)
70        if item is _FRAME_STREAM_CLOSED:
71            raise StreamClosedError("frame stream is closed")
72        if item is _FRAME_STREAM_OVERFLOW:
73            raise FrameStreamOverflowError(self._consumer_name)
74        return item  # type: ignore[return-value]
def read_many( self, *, max_items: int | None = None) -> list[modlink_sdk.FrameEnvelope]:
76    def read_many(self, *, max_items: int | None = None) -> list[FrameEnvelope]:
77        if max_items is not None and max_items <= 0:
78            return []
79
80        items: list[FrameEnvelope] = []
81        while max_items is None or len(items) < max_items:
82            try:
83                item = self._queue.get_nowait()
84            except queue.Empty:
85                break
86            if item is _FRAME_STREAM_CLOSED:
87                if items:
88                    return items
89                raise StreamClosedError("frame stream is closed")
90            if item is _FRAME_STREAM_OVERFLOW:
91                if items:
92                    return items
93                raise FrameStreamOverflowError(self._consumer_name)
94            items.append(item)  # type: ignore[arg-type]
95        return items
def close(self) -> None:
 97    def close(self) -> None:
 98        with self._lock:
 99            if self._closed:
100                return
101            self._closed = True
102        self._bus._close_frame_stream(self)
class FrameStreamOverflowError(builtins.Exception):
20class FrameStreamOverflowError(Exception):
21    def __init__(self, consumer_name: str) -> None:
22        self.consumer_name = consumer_name
23        super().__init__(f"frame stream overflowed: consumer_name={consumer_name}")

Common base class for all non-exit exceptions.

FrameStreamOverflowError(consumer_name: str)
21    def __init__(self, consumer_name: str) -> None:
22        self.consumer_name = consumer_name
23        super().__init__(f"frame stream overflowed: consumer_name={consumer_name}")
consumer_name
def configure_host_logging( *, log_path: str | pathlib._local.Path | None = None, app_name: str = 'ModLink Studio', app_author: str = 'ModLink', log_filename: str = 'modlink.log', console: bool = True, debug: bool = False, max_bytes: int = 5242880, backup_count: int = 5) -> pathlib._local.Path:
26def configure_host_logging(
27    *,
28    log_path: str | Path | None = None,
29    app_name: str = "ModLink Studio",
30    app_author: str = "ModLink",
31    log_filename: str = "modlink.log",
32    console: bool = True,
33    debug: bool = False,
34    max_bytes: int = DEFAULT_MAX_LOG_BYTES,
35    backup_count: int = DEFAULT_BACKUP_COUNT,
36) -> Path:
37    resolved_path = (
38        Path(log_path)
39        if log_path is not None
40        else Path(user_log_path(app_name, app_author)) / log_filename
41    )
42    resolved_path.parent.mkdir(parents=True, exist_ok=True)
43
44    root_logger = logging.getLogger()
45    _remove_managed_handlers(root_logger)
46    root_logger.setLevel(logging.WARNING)
47    log_level = logging.DEBUG if debug else logging.INFO
48
49    formatter = logging.Formatter(DEFAULT_LOG_FORMAT)
50    file_handler = RotatingFileHandler(
51        resolved_path,
52        maxBytes=max(1, int(max_bytes)),
53        backupCount=max(1, int(backup_count)),
54        encoding="utf-8",
55    )
56    file_handler.setLevel(log_level)
57    file_handler.setFormatter(formatter)
58    _mark_managed(file_handler)
59    root_logger.addHandler(file_handler)
60
61    if console:
62        console_handler = logging.StreamHandler(sys.stderr)
63        console_handler.setLevel(log_level)
64        console_handler.setFormatter(formatter)
65        _mark_managed(console_handler)
66        root_logger.addHandler(console_handler)
67
68    for logger_name in _MANAGED_LOGGER_NAMES:
69        logger = logging.getLogger(logger_name)
70        logger.setLevel(log_level)
71        logger.propagate = True
72
73    logging.captureWarnings(True)
74    logging.getLogger(__name__).info(
75        "Configured logging at %s (level=%s)",
76        resolved_path,
77        logging.getLevelName(log_level),
78    )
79    return resolved_path
@dataclass(frozen=True, slots=True)
class DriverSnapshot:
 7@dataclass(frozen=True, slots=True)
 8class DriverSnapshot:
 9    driver_id: str
10    display_name: str
11    supported_providers: tuple[str, ...]
12    is_running: bool
13    is_connected: bool
14    is_streaming: bool
DriverSnapshot( driver_id: str, display_name: str, supported_providers: tuple[str, ...], is_running: bool, is_connected: bool, is_streaming: bool)
driver_id: str
display_name: str
supported_providers: tuple[str, ...]
is_running: bool
is_connected: bool
is_streaming: bool
class ModLinkEngine:
 28class ModLinkEngine:
 29    """Application engine that owns shared services and driver threads."""
 30
 31    def __init__(
 32        self,
 33        *,
 34        settings_path: str | Path | None = None,
 35        settings_version: int = 1,
 36        parent: object | None = None,
 37    ) -> None:
 38        resolved_driver_factories = tuple(discover_driver_factories())
 39        logger.info(
 40            "Starting ModLink engine with %d driver factories", len(resolved_driver_factories)
 41        )
 42        self._parent = parent
 43        self._event_broker = BackendEventBroker()
 44        self._settings = SettingsStore(
 45            path=_resolve_settings_path(settings_path),
 46            version=settings_version,
 47            on_change=self._publish_setting_changed,
 48        )
 49        declare_core_settings(self._settings)
 50        if self._settings.path is not None and self._settings.path.exists():
 51            self._settings.load(ignore_unknown=True)
 52        self.bus = StreamBus(event_broker=self._event_broker, parent=self)
 53        self._recording = RecordingBackend(
 54            self.bus,
 55            settings=self._settings,
 56            publish_event=self._event_broker.publish,
 57            parent=self,
 58        )
 59        self._replay = ReplayBackend(
 60            settings=self._settings,
 61            parent=self,
 62        )
 63        self._driver_portals: dict[str, DriverPortal] = {}
 64        attached_portals: list[DriverPortal] = []
 65        try:
 66            for factory in resolved_driver_factories:
 67                portal = self._attach_driver(factory)
 68                attached_portals.append(portal)
 69            self._recording.start()
 70            self._replay.start()
 71            logger.info("ModLink engine started with %d driver portals", len(self._driver_portals))
 72        except Exception as exc:
 73            logger.exception("Engine startup failed; rolling back attached services")
 74            self._rollback_startup(attached_portals, exc)
 75            raise
 76
 77    @property
 78    def settings(self) -> SettingsStore:
 79        return self._settings
 80
 81    @property
 82    def recording(self) -> RecordingBackend:
 83        return self._recording
 84
 85    @property
 86    def replay(self) -> ReplayBackend:
 87        return self._replay
 88
 89    def recording_snapshot(self) -> RecordingSnapshot:
 90        return self._recording.snapshot()
 91
 92    def replay_snapshot(self) -> ReplaySnapshot:
 93        return self._replay.snapshot()
 94
 95    def driver_portals(self) -> tuple[DriverPortal, ...]:
 96        return tuple(self._driver_portals.values())
 97
 98    def driver_snapshots(self) -> tuple[DriverSnapshot, ...]:
 99        return tuple(portal.snapshot() for portal in self._driver_portals.values())
100
101    def driver_portal(self, driver_id: str) -> DriverPortal | None:
102        return self._driver_portals.get(driver_id)
103
104    def settings_snapshot(self) -> dict[str, Any]:
105        return self._settings.snapshot()
106
107    def open_event_stream(self, *, maxsize: int = 1024) -> EventStream:
108        return self._event_broker.open_stream(maxsize=maxsize)
109
110    def _publish_setting_changed(self, key: str, value: Any) -> None:
111        self._event_broker.publish(
112            SettingChangedEvent(key=key, value=value, ts=time.time()),
113        )
114
115    def _attach_driver(self, driver_factory: DriverFactory) -> DriverPortal:
116        portal = DriverPortal(
117            driver_factory,
118            publish_event=self._event_broker.publish,
119            frame_sink=self.bus.ingest_frame,
120            parent=self,
121        )
122        driver_id = portal.driver_id.strip()
123        if driver_id in self._driver_portals:
124            raise ValueError(f"driver_id '{driver_id}' is already installed")
125
126        descriptors = portal.descriptors()
127        self.bus.add_descriptors(descriptors)
128        try:
129            portal.start(timeout_ms=DEFAULT_DRIVER_STARTUP_TIMEOUT_MS)
130        except Exception:
131            for descriptor in descriptors:
132                self.bus.remove_descriptor(descriptor.stream_id)
133            raise
134
135        self._driver_portals[driver_id] = portal
136        return portal
137
138    def _rollback_startup(
139        self,
140        attached_portals: list[DriverPortal],
141        startup_error: Exception,
142    ) -> None:
143        cleanup_failures: list[str] = []
144        for portal in reversed(attached_portals):
145            try:
146                portal.stop()
147            except Exception as exc:
148                logger.exception(
149                    "Engine startup rollback failed while stopping driver '%s'", portal.driver_id
150                )
151                cleanup_failures.append(
152                    "cleanup failed while stopping driver "
153                    f"'{portal.driver_id}': {type(exc).__name__}: {exc}"
154                )
155            for descriptor in portal.descriptors():
156                self.bus.remove_descriptor(descriptor.stream_id)
157
158        self._driver_portals.clear()
159        try:
160            self._recording.shutdown()
161        except Exception as exc:
162            logger.exception("Engine startup rollback failed while shutting down recording")
163            cleanup_failures.append(
164                f"cleanup failed while shutting down recording: {type(exc).__name__}: {exc}"
165            )
166        try:
167            self._replay.shutdown()
168        except Exception as exc:
169            logger.exception("Engine startup rollback failed while shutting down replay")
170            cleanup_failures.append(
171                f"cleanup failed while shutting down replay: {type(exc).__name__}: {exc}"
172            )
173
174        for note in cleanup_failures:
175            startup_error.add_note(note)
176
177    def shutdown(self) -> None:
178        logger.info("Shutting down ModLink engine")
179        first_error: Exception | None = None
180        for portal in self._driver_portals.values():
181            try:
182                portal.stop()
183            except Exception as exc:
184                logger.exception(
185                    "Engine shutdown failed while stopping driver '%s'", portal.driver_id
186                )
187                if first_error is None:
188                    first_error = exc
189        self._driver_portals.clear()
190        try:
191            self._recording.shutdown()
192        except Exception as exc:
193            logger.exception("Engine shutdown failed while shutting down recording")
194            if first_error is None:
195                first_error = exc
196        try:
197            self._replay.shutdown()
198        except Exception as exc:
199            logger.exception("Engine shutdown failed while shutting down replay")
200            if first_error is None:
201                first_error = exc
202        if first_error is not None:
203            raise first_error
204        logger.info("ModLink engine shut down")

Application engine that owns shared services and driver threads.

ModLinkEngine( *, settings_path: str | pathlib._local.Path | None = None, settings_version: int = 1, parent: object | None = None)
31    def __init__(
32        self,
33        *,
34        settings_path: str | Path | None = None,
35        settings_version: int = 1,
36        parent: object | None = None,
37    ) -> None:
38        resolved_driver_factories = tuple(discover_driver_factories())
39        logger.info(
40            "Starting ModLink engine with %d driver factories", len(resolved_driver_factories)
41        )
42        self._parent = parent
43        self._event_broker = BackendEventBroker()
44        self._settings = SettingsStore(
45            path=_resolve_settings_path(settings_path),
46            version=settings_version,
47            on_change=self._publish_setting_changed,
48        )
49        declare_core_settings(self._settings)
50        if self._settings.path is not None and self._settings.path.exists():
51            self._settings.load(ignore_unknown=True)
52        self.bus = StreamBus(event_broker=self._event_broker, parent=self)
53        self._recording = RecordingBackend(
54            self.bus,
55            settings=self._settings,
56            publish_event=self._event_broker.publish,
57            parent=self,
58        )
59        self._replay = ReplayBackend(
60            settings=self._settings,
61            parent=self,
62        )
63        self._driver_portals: dict[str, DriverPortal] = {}
64        attached_portals: list[DriverPortal] = []
65        try:
66            for factory in resolved_driver_factories:
67                portal = self._attach_driver(factory)
68                attached_portals.append(portal)
69            self._recording.start()
70            self._replay.start()
71            logger.info("ModLink engine started with %d driver portals", len(self._driver_portals))
72        except Exception as exc:
73            logger.exception("Engine startup failed; rolling back attached services")
74            self._rollback_startup(attached_portals, exc)
75            raise
bus
settings: SettingsStore
77    @property
78    def settings(self) -> SettingsStore:
79        return self._settings
recording: RecordingBackend
81    @property
82    def recording(self) -> RecordingBackend:
83        return self._recording
replay: ReplayBackend
85    @property
86    def replay(self) -> ReplayBackend:
87        return self._replay
def recording_snapshot(self) -> RecordingSnapshot:
89    def recording_snapshot(self) -> RecordingSnapshot:
90        return self._recording.snapshot()
def replay_snapshot(self) -> ReplaySnapshot:
92    def replay_snapshot(self) -> ReplaySnapshot:
93        return self._replay.snapshot()
def driver_portals(self) -> tuple[modlink_core.drivers.portal.core.DriverPortal, ...]:
95    def driver_portals(self) -> tuple[DriverPortal, ...]:
96        return tuple(self._driver_portals.values())
def driver_snapshots(self) -> tuple[DriverSnapshot, ...]:
98    def driver_snapshots(self) -> tuple[DriverSnapshot, ...]:
99        return tuple(portal.snapshot() for portal in self._driver_portals.values())
def driver_portal( self, driver_id: str) -> modlink_core.drivers.portal.core.DriverPortal | None:
101    def driver_portal(self, driver_id: str) -> DriverPortal | None:
102        return self._driver_portals.get(driver_id)
def settings_snapshot(self) -> dict[str, typing.Any]:
104    def settings_snapshot(self) -> dict[str, Any]:
105        return self._settings.snapshot()
def open_event_stream(self, *, maxsize: int = 1024) -> EventStream:
107    def open_event_stream(self, *, maxsize: int = 1024) -> EventStream:
108        return self._event_broker.open_stream(maxsize=maxsize)
def shutdown(self) -> None:
177    def shutdown(self) -> None:
178        logger.info("Shutting down ModLink engine")
179        first_error: Exception | None = None
180        for portal in self._driver_portals.values():
181            try:
182                portal.stop()
183            except Exception as exc:
184                logger.exception(
185                    "Engine shutdown failed while stopping driver '%s'", portal.driver_id
186                )
187                if first_error is None:
188                    first_error = exc
189        self._driver_portals.clear()
190        try:
191            self._recording.shutdown()
192        except Exception as exc:
193            logger.exception("Engine shutdown failed while shutting down recording")
194            if first_error is None:
195                first_error = exc
196        try:
197            self._replay.shutdown()
198        except Exception as exc:
199            logger.exception("Engine shutdown failed while shutting down replay")
200            if first_error is None:
201                first_error = exc
202        if first_error is not None:
203            raise first_error
204        logger.info("ModLink engine shut down")
@dataclass(frozen=True, slots=True)
class RecordingFailedEvent:
22@dataclass(frozen=True, slots=True)
23class RecordingFailedEvent:
24    recording_id: str
25    recording_path: str
26    frame_counts_by_stream: dict[str, int]
27    reason: str
28    ts_ns: int
29    kind: Literal["recording_failed"] = "recording_failed"
RecordingFailedEvent( recording_id: str, recording_path: str, frame_counts_by_stream: dict[str, int], reason: str, ts_ns: int, kind: Literal['recording_failed'] = 'recording_failed')
recording_id: str
recording_path: str
frame_counts_by_stream: dict[str, int]
reason: str
ts_ns: int
kind: Literal['recording_failed']
@dataclass(frozen=True, slots=True)
class SettingChangedEvent:
32@dataclass(frozen=True, slots=True)
33class SettingChangedEvent:
34    key: str
35    value: Any
36    ts: float
37    kind: Literal["setting_changed"] = "setting_changed"
SettingChangedEvent( key: str, value: Any, ts: float, kind: Literal['setting_changed'] = 'setting_changed')
key: str
value: Any
ts: float
kind: Literal['setting_changed']
class SettingsStore:
230class SettingsStore:
231    ROOT_RESERVED = frozenset({"add", "load", "path", "save", "snapshot"})
232
233    def __init__(
234        self,
235        *,
236        path: str | Path | None = None,
237        version: int = 1,
238        on_change: Any = None,
239    ) -> None:
240        object.__setattr__(self, "_path", None if path is None else Path(path))
241        object.__setattr__(self, "_version", version)
242        object.__setattr__(self, "_on_change", on_change)
243        object.__setattr__(self, "_lock", RLock())
244        object.__setattr__(self, "_root", SettingsNode(name="", parent=None, store=self))
245
246    @property
247    def path(self) -> Path | None:
248        return self._path
249
250    def add(self, **specs: SettingsSpec) -> SettingsStore:
251        with self._lock:
252            overlap = self.ROOT_RESERVED.intersection(specs)
253            if overlap:
254                reserved_name = next(iter(sorted(overlap)))
255                raise ValueError(f"reserved key: {reserved_name!r}")
256            self._root.add(**specs)
257        return self
258
259    def __getattr__(self, name: str) -> SettingsNode | SettingItem:
260        if name.startswith("_"):
261            raise AttributeError(name)
262        with self._lock:
263            return self._root.get_child(name)
264
265    def __setattr__(self, name: str, value: object) -> None:
266        if name.startswith("_"):
267            object.__setattr__(self, name, value)
268            return
269        with self._lock:
270            setattr(self._root, name, value)
271
272    def snapshot(self) -> dict[str, object]:
273        with self._lock:
274            return self._root.dump_dict()
275
276    def save(self, path: str | Path | None = None) -> None:
277        with self._lock:
278            resolved_path = self._resolve_path(path)
279            resolved_path.parent.mkdir(parents=True, exist_ok=True)
280            payload = {
281                "_version": self._version,
282                "values": self._root.dump_dict(),
283            }
284            _atomic_write_text(
285                resolved_path,
286                json.dumps(payload, indent=2, ensure_ascii=False),
287            )
288
289    def load(
290        self,
291        path: str | Path | None = None,
292        *,
293        ignore_unknown: bool = False,
294    ) -> None:
295        with self._lock:
296            resolved_path = self._resolve_path(path)
297            payload = json.loads(resolved_path.read_text(encoding="utf-8"))
298            if not isinstance(payload, dict):
299                raise TypeError("settings file must contain a JSON object")
300
301            file_version = payload.get("_version")
302            if file_version is not None and file_version != self._version:
303                raise ValueError(
304                    f"settings version mismatch: file={file_version}, expected={self._version}"
305                )
306
307            values = payload.get("values", {})
308            previous_snapshot = self.snapshot()
309            previous = self._root.collect_leaf_values()
310            self._root.reset(notify=False)
311            try:
312                self._root.apply_dict(values, notify=False, ignore_unknown=ignore_unknown)
313            except Exception:
314                self._root.reset(notify=False)
315                self._root.apply_dict(previous_snapshot, notify=False)
316                raise
317            current = self._root.collect_leaf_values()
318            for key, value in current.items():
319                if previous.get(key) != value:
320                    self._emit_change(key, value)
321
322    def _resolve_path(self, path: str | Path | None) -> Path:
323        resolved_path = self._path if path is None else Path(path)
324        if resolved_path is None:
325            raise ValueError("path is required")
326        return resolved_path
327
328    def _emit_change(self, key: str, value: object) -> None:
329        on_change = self._on_change
330        if on_change is None:
331            return
332        on_change(key, value)
SettingsStore( *, path: str | pathlib._local.Path | None = None, version: int = 1, on_change: Any = None)
233    def __init__(
234        self,
235        *,
236        path: str | Path | None = None,
237        version: int = 1,
238        on_change: Any = None,
239    ) -> None:
240        object.__setattr__(self, "_path", None if path is None else Path(path))
241        object.__setattr__(self, "_version", version)
242        object.__setattr__(self, "_on_change", on_change)
243        object.__setattr__(self, "_lock", RLock())
244        object.__setattr__(self, "_root", SettingsNode(name="", parent=None, store=self))
ROOT_RESERVED = frozenset({'path', 'load', 'save', 'snapshot', 'add'})
path: pathlib._local.Path | None
246    @property
247    def path(self) -> Path | None:
248        return self._path
def add( self, **specs: modlink_core.settings.spec.SettingsSpec) -> SettingsStore:
250    def add(self, **specs: SettingsSpec) -> SettingsStore:
251        with self._lock:
252            overlap = self.ROOT_RESERVED.intersection(specs)
253            if overlap:
254                reserved_name = next(iter(sorted(overlap)))
255                raise ValueError(f"reserved key: {reserved_name!r}")
256            self._root.add(**specs)
257        return self
def snapshot(self) -> dict[str, object]:
272    def snapshot(self) -> dict[str, object]:
273        with self._lock:
274            return self._root.dump_dict()
def save(self, path: str | pathlib._local.Path | None = None) -> None:
276    def save(self, path: str | Path | None = None) -> None:
277        with self._lock:
278            resolved_path = self._resolve_path(path)
279            resolved_path.parent.mkdir(parents=True, exist_ok=True)
280            payload = {
281                "_version": self._version,
282                "values": self._root.dump_dict(),
283            }
284            _atomic_write_text(
285                resolved_path,
286                json.dumps(payload, indent=2, ensure_ascii=False),
287            )
def load( self, path: str | pathlib._local.Path | None = None, *, ignore_unknown: bool = False) -> None:
289    def load(
290        self,
291        path: str | Path | None = None,
292        *,
293        ignore_unknown: bool = False,
294    ) -> None:
295        with self._lock:
296            resolved_path = self._resolve_path(path)
297            payload = json.loads(resolved_path.read_text(encoding="utf-8"))
298            if not isinstance(payload, dict):
299                raise TypeError("settings file must contain a JSON object")
300
301            file_version = payload.get("_version")
302            if file_version is not None and file_version != self._version:
303                raise ValueError(
304                    f"settings version mismatch: file={file_version}, expected={self._version}"
305                )
306
307            values = payload.get("values", {})
308            previous_snapshot = self.snapshot()
309            previous = self._root.collect_leaf_values()
310            self._root.reset(notify=False)
311            try:
312                self._root.apply_dict(values, notify=False, ignore_unknown=ignore_unknown)
313            except Exception:
314                self._root.reset(notify=False)
315                self._root.apply_dict(previous_snapshot, notify=False)
316                raise
317            current = self._root.collect_leaf_values()
318            for key, value in current.items():
319                if previous.get(key) != value:
320                    self._emit_change(key, value)
class StreamClosedError(builtins.Exception):
10class StreamClosedError(Exception):
11    """Raised when a blocking stream read is interrupted by stream closure."""

Raised when a blocking stream read is interrupted by stream closure.

class StreamBus:
164class StreamBus:
165    """Stores stream descriptors and fans out accepted frames to frame streams."""
166
167    _STREAM_COUNTER = count(1)
168
169    def __init__(
170        self,
171        *,
172        event_broker: BackendEventBroker,
173        parent: object | None = None,
174    ) -> None:
175        self._event_broker = event_broker
176        self._parent = parent
177        self._descriptors: dict[str, StreamDescriptor] = {}
178        self._frame_streams: list[FrameStream] = []
179        self._lock = RLock()
180
181    def add_descriptor(self, descriptor: StreamDescriptor) -> None:
182        existing_descriptor = self._descriptors.get(descriptor.stream_id)
183        if existing_descriptor is not None:
184            if existing_descriptor == descriptor:
185                return
186
187            raise ValueError(f"conflicting descriptor for stream_id '{descriptor.stream_id}'")
188
189        self._descriptors[descriptor.stream_id] = descriptor
190
191    def add_descriptors(self, descriptors: Iterable[StreamDescriptor]) -> None:
192        for descriptor in descriptors:
193            self.add_descriptor(descriptor)
194
195    def remove_descriptor(self, stream_id: str) -> None:
196        self._descriptors.pop(str(stream_id), None)
197
198    def ingest_frame(self, frame: object) -> bool:
199        if not isinstance(frame, FrameEnvelope):
200            return False
201
202        if frame.stream_id not in self._descriptors:
203            return False
204
205        with self._lock:
206            streams = tuple(self._frame_streams)
207        for stream in streams:
208            stream._publish(frame)
209        return True
210
211    def open_frame_stream(
212        self,
213        *,
214        maxsize: int = 256,
215        drop_policy: FrameDropPolicy = "drop_oldest",
216        consumer_name: str | None = None,
217    ) -> FrameStream:
218        stream = FrameStream(
219            self,
220            maxsize=maxsize,
221            drop_policy=drop_policy,
222            consumer_name=consumer_name or self._next_consumer_name(),
223        )
224        with self._lock:
225            self._frame_streams.append(stream)
226        return stream
227
228    def descriptor(self, stream_id: str) -> StreamDescriptor | None:
229        return self._descriptors.get(stream_id)
230
231    def descriptors(self) -> dict[str, StreamDescriptor]:
232        return dict(self._descriptors)
233
234    def _close_frame_stream(self, stream: FrameStream) -> None:
235        with self._lock:
236            try:
237                self._frame_streams.remove(stream)
238            except ValueError:
239                pass
240        stream._push_closed()
241
242    @classmethod
243    def _next_consumer_name(cls) -> str:
244        return f"frame_stream_{next(cls._STREAM_COUNTER)}"

Stores stream descriptors and fans out accepted frames to frame streams.

StreamBus( *, event_broker: BackendEventBroker, parent: object | None = None)
169    def __init__(
170        self,
171        *,
172        event_broker: BackendEventBroker,
173        parent: object | None = None,
174    ) -> None:
175        self._event_broker = event_broker
176        self._parent = parent
177        self._descriptors: dict[str, StreamDescriptor] = {}
178        self._frame_streams: list[FrameStream] = []
179        self._lock = RLock()
def add_descriptor(self, descriptor: modlink_sdk.StreamDescriptor) -> None:
181    def add_descriptor(self, descriptor: StreamDescriptor) -> None:
182        existing_descriptor = self._descriptors.get(descriptor.stream_id)
183        if existing_descriptor is not None:
184            if existing_descriptor == descriptor:
185                return
186
187            raise ValueError(f"conflicting descriptor for stream_id '{descriptor.stream_id}'")
188
189        self._descriptors[descriptor.stream_id] = descriptor
def add_descriptors(self, descriptors: Iterable[modlink_sdk.StreamDescriptor]) -> None:
191    def add_descriptors(self, descriptors: Iterable[StreamDescriptor]) -> None:
192        for descriptor in descriptors:
193            self.add_descriptor(descriptor)
def remove_descriptor(self, stream_id: str) -> None:
195    def remove_descriptor(self, stream_id: str) -> None:
196        self._descriptors.pop(str(stream_id), None)
def ingest_frame(self, frame: object) -> bool:
198    def ingest_frame(self, frame: object) -> bool:
199        if not isinstance(frame, FrameEnvelope):
200            return False
201
202        if frame.stream_id not in self._descriptors:
203            return False
204
205        with self._lock:
206            streams = tuple(self._frame_streams)
207        for stream in streams:
208            stream._publish(frame)
209        return True
def open_frame_stream( self, *, maxsize: int = 256, drop_policy: Literal['drop_oldest', 'error'] = 'drop_oldest', consumer_name: str | None = None) -> FrameStream:
211    def open_frame_stream(
212        self,
213        *,
214        maxsize: int = 256,
215        drop_policy: FrameDropPolicy = "drop_oldest",
216        consumer_name: str | None = None,
217    ) -> FrameStream:
218        stream = FrameStream(
219            self,
220            maxsize=maxsize,
221            drop_policy=drop_policy,
222            consumer_name=consumer_name or self._next_consumer_name(),
223        )
224        with self._lock:
225            self._frame_streams.append(stream)
226        return stream
def descriptor(self, stream_id: str) -> modlink_sdk.StreamDescriptor | None:
228    def descriptor(self, stream_id: str) -> StreamDescriptor | None:
229        return self._descriptors.get(stream_id)
def descriptors(self) -> dict[str, modlink_sdk.StreamDescriptor]:
231    def descriptors(self) -> dict[str, StreamDescriptor]:
232        return dict(self._descriptors)