Skip to content

Commit 550c68d

Browse files
committed
fix(batch): deduplicate ping_time + error handling for resilient parallel processing
1 parent 473ac11 commit 550c68d

1 file changed

Lines changed: 32 additions & 13 deletions

File tree

scripts/batch_processing/run_combine_daily.py

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,13 @@ def combine_sv(
494494
if not freq_datasets:
495495
return None
496496

497+
# Deduplicate ping_time in each freq dataset (EK80 can fire simultaneously)
498+
for i, fds in enumerate(freq_datasets):
499+
if "ping_time" in fds.dims:
500+
idx = fds.get_index("ping_time")
501+
if idx.duplicated().any():
502+
freq_datasets[i] = fds.sel(ping_time=~idx.duplicated())
503+
497504
if len(freq_datasets) == 1:
498505
combined = freq_datasets[0]
499506
else:
@@ -504,7 +511,10 @@ def combine_sv(
504511
for i, fds in enumerate(freq_datasets):
505512
if "pulse_mode" in fds:
506513
freq_datasets[i] = fds.drop_vars("pulse_mode")
507-
combined = xr.concat(freq_datasets, dim="channel")
514+
combined = xr.concat(
515+
freq_datasets, dim="channel",
516+
join="outer", fill_value=np.nan,
517+
)
508518
combined["pulse_mode"] = pm
509519

510520
combined.attrs["combined_pulse_modes"] = "short_pulse+long_pulse"
@@ -535,10 +545,14 @@ def combine_one_day(
535545

536546
t0 = time.time()
537547

538-
if product in ("mvbs", "nasc"):
539-
combined = combine_mvbs_or_nasc(day, product, suffix, data_var)
540-
else:
541-
combined = combine_sv(day, suffix)
548+
try:
549+
if product in ("mvbs", "nasc"):
550+
combined = combine_mvbs_or_nasc(day, product, suffix, data_var)
551+
else:
552+
combined = combine_sv(day, suffix)
553+
except Exception as e:
554+
log.warning(" %s/%s: FAILED — %s", day, product, e)
555+
continue
542556

543557
if combined is None:
544558
log.info(" %s/%s: no data", day, product)
@@ -846,16 +860,21 @@ def process_one_day(args: tuple) -> tuple[str, int, int]:
846860
log.info("Processing %s ...", day)
847861
t0 = time.time()
848862

849-
# Populate frequency cache from denoised/raw zarrs
850-
_CHANNEL_FREQ_CACHE.clear()
851-
_populate_freq_cache(day)
863+
try:
864+
# Populate frequency cache from denoised/raw zarrs
865+
_CHANNEL_FREQ_CACHE.clear()
866+
_populate_freq_cache(day)
852867

853-
combined_zarrs = combine_one_day(day, products, skip_existing=skip_existing)
868+
combined_zarrs = combine_one_day(day, products, skip_existing=skip_existing)
854869

855-
n_echograms = 0
856-
if not skip_echograms and combined_zarrs:
857-
echogram_files = generate_echograms_for_day(day, combined_zarrs, ECHOGRAM_DIR)
858-
n_echograms = len(echogram_files)
870+
n_echograms = 0
871+
if not skip_echograms and combined_zarrs:
872+
echogram_files = generate_echograms_for_day(day, combined_zarrs, ECHOGRAM_DIR)
873+
n_echograms = len(echogram_files)
874+
except Exception as e:
875+
log.error(" %s FAILED: %s", day, e)
876+
_release_memory()
877+
return day, 0, 0
859878

860879
dt = time.time() - t0
861880
log.info(

0 commit comments

Comments
 (0)