Skip to content

Commit 71036f9

Browse files
committed
fix: use global_sample_index (per-channel header index) instead of total_samples_written (inflated by multi-ch payloads)
1 parent 32a1f85 commit 71036f9

File tree

1 file changed

+18
-17
lines changed

1 file changed

+18
-17
lines changed

examples/joint_angle_regression/open_ephys_lsl_streamer.py

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ def __init__(
151151
self.detected_fs = 0.0 # filled after connect
152152
self._header_fs = 0.0 # from ZMQ header field
153153
self._measured_fs = 0 # empirical throughput
154-
self._prev_written = 0 # track ref-channel total_samples_written
154+
self._prev_idx = 0 # track global_sample_index (per-channel)
155155

156156
@staticmethod
157157
def _round_fs(raw: float) -> float:
@@ -174,7 +174,8 @@ def _wait_for_channels(self, timeout=3.0):
174174
cross-validate the header-reported ``sample_rate``.
175175
176176
Returns ``(n_channels, measured_fs)`` where *measured_fs* is
177-
samples-per-second computed from ``total_samples_written``.
177+
samples-per-second computed from ``global_sample_index`` (header-
178+
based per-channel index, not the raw payload byte count).
178179
"""
179180
import time as _t
180181

@@ -183,9 +184,9 @@ def _wait_for_channels(self, timeout=3.0):
183184
stable_since = start
184185
channels_stable = False
185186

186-
# snapshot sample counter at start
187+
# snapshot the per-channel sample index at start
187188
with self.client._lock:
188-
samples_t0 = int(self.client.total_samples_written)
189+
idx_t0 = int(self.client.global_sample_index)
189190

190191
while (_t.time() - start) < timeout:
191192
with self.client._lock:
@@ -197,8 +198,7 @@ def _wait_for_channels(self, timeout=3.0):
197198
channels_stable = True
198199
# Keep looping a bit longer to accumulate a better fs estimate.
199200
# We want at least 1 s of data total for a reliable rate.
200-
min_end = start + 1.5
201-
if _t.time() >= min_end:
201+
if _t.time() >= start + 1.5:
202202
break
203203
elif channels_stable:
204204
if _t.time() >= start + 1.5:
@@ -207,8 +207,8 @@ def _wait_for_channels(self, timeout=3.0):
207207

208208
elapsed = max(_t.time() - start, 1e-6)
209209
with self.client._lock:
210-
samples_t1 = int(self.client.total_samples_written)
211-
measured_fs = (samples_t1 - samples_t0) / elapsed
210+
idx_t1 = int(self.client.global_sample_index)
211+
measured_fs = (idx_t1 - idx_t0) / elapsed
212212
return prev_count, measured_fs
213213

214214
def start(self):
@@ -306,9 +306,9 @@ def start(self):
306306
except Exception:
307307
self.imu_client = None
308308

309-
# Sync drain cursor to ref channel's total_samples_written
309+
# Sync drain cursor to global_sample_index (per-channel header index)
310310
with self.client._lock:
311-
self._prev_written = int(self.client.total_samples_written)
311+
self._prev_idx = int(self.client.global_sample_index)
312312

313313
self.running = True
314314
self.last_poll = _now()
@@ -355,12 +355,13 @@ def poll_once(self):
355355
self.last_poll = now
356356
info["rate_hz"] = 1.0 / dt
357357

358-
# Use total_samples_written (incremented every ref-channel packet) as cursor.
359-
# This is a monotonically increasing counter of how many samples the ref
360-
# channel has written — reliable and independent of header-index math.
358+
# Use global_sample_index (header-based per-channel index) as cursor.
359+
# This tracks sample_num+num_samples from ZMQ headers and represents
360+
# the true per-channel sample count, unlike total_samples_written which
361+
# uses the raw payload size (may be inflated by multi-channel payloads).
361362
with self.client._lock:
362-
total_w = int(self.client.total_samples_written)
363-
n_new = total_w - self._prev_written
363+
cur_idx = int(self.client.global_sample_index)
364+
n_new = cur_idx - self._prev_idx
364365
if n_new <= 0:
365366
return info
366367
# Cap to buffer length
@@ -382,7 +383,7 @@ def poll_once(self):
382383
start_idx = blen - take
383384
for j in range(take):
384385
emg_arr[i, n_new - take + j] = buf[start_idx + j]
385-
self._prev_written = total_w
386+
self._prev_idx = cur_idx
386387

387388
# emg_arr: (channels, n_new) → transpose to (n_new, channels)
388389
emg = emg_arr.T
@@ -393,7 +394,7 @@ def poll_once(self):
393394
if n_samples <= 0:
394395
return info
395396

396-
fs = float(self.client.fs) if float(self.client.fs) > 0 else self.expected_fs
397+
fs = self.detected_fs if self.detected_fs > 0 else self.expected_fs
397398
ts_end = _now()
398399
ts = ts_end - (np.arange(n_samples, dtype=np.float64)[::-1] / fs)
399400

0 commit comments

Comments
 (0)