diff --git a/README.md b/README.md index 5cc5979..caecb84 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,25 @@ pip install git+https://github.com/ezmsg-org/ezmsg-lsl@dev See the `examples` folder for more details. +### LSL Outlet + +`LSLOutletUnit` publishes incoming `AxisArray` messages to an LSL stream. Key settings (`LSLOutletSettings`): + +* `stream_name` / `stream_type` — name and type advertised on the LSL stream. +* `use_message_timestamp` (default `True`) — push samples with the incoming message timestamps, or with `pylsl.local_clock()` when `False`. +* `assume_lsl_clock` (default `False`) — when `use_message_timestamp` is `True`, whether the incoming timestamps are already in the LSL clock (otherwise they are converted from the system clock). +* `sync_blocking` (default `False`) — when `True`, the outlet is created with the `pylsl.transp_sync_blocking` transport flag, enabling the synchronous (zero-copy) outlet mode for high-bandwidth streams. Requires `pylsl >= 1.18.3b1`. Leave `False` for the standard asynchronous transport. + +```python +from ezmsg.lsl.outlet import LSLOutletUnit + +outlet = LSLOutletUnit( + stream_name="my_stream", + stream_type="EEG", + sync_blocking=True, # synchronous zero-copy transport +) +``` + ## Developers We use [`uv`](https://docs.astral.sh/uv/getting-started/installation/) for development. It is not strictly required, but if you intend to contribute to ezmsg-lsl then using `uv` will lead to the smoothest collaboration. diff --git a/pyproject.toml b/pyproject.toml index 85003a5..e46d888 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ dependencies = [ "ezmsg>=3.9.0", "ezmsg-baseproc>=1.7.0", "numpy>=1.26.4", - "pylsl>=1.18.2", + "pylsl>=1.18.3b1", ] dynamic = ["version"] diff --git a/src/ezmsg/lsl/outlet.py b/src/ezmsg/lsl/outlet.py index c6e4bbe..1374c9d 100644 --- a/src/ezmsg/lsl/outlet.py +++ b/src/ezmsg/lsl/outlet.py @@ -100,6 +100,13 @@ class LSLOutletSettings(ez.Settings): Note: Ignored when use_message_timestamp is False. """ + sync_blocking: bool = False + """ + When `True`, the outlet is created with the `pylsl.transp_sync_blocking` transport flag, enabling the + synchronous (zero-copy) outlet mode for high-bandwidth streams. When `False` (default), the outlet uses + the standard asynchronous transport. + """ + @processor_state class LSLOutletProcessorState: @@ -148,7 +155,8 @@ def _reset_state(self, message: AxisArray) -> None: source_id="ezmsg-" + source_id, ) populate_desc_from_axisarray(info, message, out_size=out_size) - self._state.outlet = pylsl.StreamOutlet(info) + transport_flags = pylsl.transp_sync_blocking if self.settings.sync_blocking else pylsl.transp_default + self._state.outlet = pylsl.StreamOutlet(info, transport_flags=transport_flags) def _process(self, message: AxisArray) -> None: dat = message.data diff --git a/tests/test_outlet.py b/tests/test_outlet.py index e3beff8..a683b5f 100644 --- a/tests/test_outlet.py +++ b/tests/test_outlet.py @@ -6,18 +6,20 @@ import tempfile import typing from pathlib import Path +from unittest import mock import ezmsg.core as ez +import numpy as np +import pylsl from ezmsg.baseproc.clock import Clock from ezmsg.util.messagecodec import message_log from ezmsg.util.messagelogger import MessageLogger -from ezmsg.util.messages.axisarray import AxisArray +from ezmsg.util.messages.axisarray import AxisArray, CoordinateAxis from ezmsg.util.terminate import TerminateOnTotal - -from ezmsg.lsl.outlet import LSLOutletUnit - from helpers.synth import Oscillator +from ezmsg.lsl.outlet import LSLOutletSettings, LSLOutletUnit, OutletProcessor + def test_outlet_system(): n_messages = 10 @@ -45,3 +47,37 @@ def test_outlet_system(): # We merely verify that the messages are being sent to the logger. assert len(messages) >= n_messages + + +def _make_msg() -> AxisArray: + return AxisArray( + data=np.zeros((5, 3), dtype=np.int16), + dims=["time", "ch"], + axes={ + "time": AxisArray.TimeAxis(fs=100.0, offset=0.0), + "ch": CoordinateAxis(data=np.array(["a", "b", "c"]), dims=["ch"]), + }, + attrs={}, + key="test", + ) + + +def _transport_flags_for(sync_blocking: bool) -> int: + """Build an outlet via OutletProcessor and capture the transport_flags + passed to pylsl.StreamOutlet, without touching the LSL network.""" + proc = OutletProcessor( + settings=LSLOutletSettings(stream_name="test", stream_type="EEG", sync_blocking=sync_blocking) + ) + with mock.patch("ezmsg.lsl.outlet.pylsl.StreamOutlet") as MockOutlet: + proc._reset_state(_make_msg()) + return MockOutlet.call_args.kwargs["transport_flags"] + + +def test_outlet_default_transport_flags(): + """By default the outlet is created with the standard async transport.""" + assert _transport_flags_for(sync_blocking=False) == pylsl.transp_default + + +def test_outlet_sync_blocking_transport_flags(): + """sync_blocking=True sets the transp_sync_blocking transport flag.""" + assert _transport_flags_for(sync_blocking=True) == pylsl.transp_sync_blocking