Skip to content

Commit a562c5d

Browse files
committed
Refactor send_to_zarr() and update_zarr() - async & compatibility with latest botocore libraries.
1 parent 59d0473 commit a562c5d

2 files changed

Lines changed: 95 additions & 99 deletions

File tree

OceanDataStore/cli/main_cli.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
"""
1313
import sys
1414
import json
15+
import asyncio
1516
import logging
1617

1718
from ..object_store_handler import send_to_zarr, send_to_icechunk, update_zarr, update_icechunk, list_objects
@@ -98,6 +99,7 @@ def process_action(args):
9899
# === Process Actions === #
99100
if args.action == "send_to_zarr":
100101

102+
asyncio.run(
101103
send_to_zarr(
102104
file=filepaths,
103105
bucket=args.bucket,
@@ -112,10 +114,11 @@ def process_action(args):
112114
dask_config_kwargs=dask_config["config_kwargs"],
113115
dask_cluster_kwargs=dask_config["cluster_kwargs"],
114116
zarr_version=zarr_version,
115-
)
117+
))
116118

117119
elif args.action == "update_zarr":
118120

121+
asyncio.run(
119122
update_zarr(
120123
file=filepaths,
121124
bucket=args.bucket,
@@ -130,7 +133,7 @@ def process_action(args):
130133
dask_config_kwargs=dask_config["config_kwargs"],
131134
dask_cluster_kwargs=dask_config["cluster_kwargs"],
132135
zarr_version=zarr_version,
133-
)
136+
))
134137

135138
elif args.action == "send_to_icechunk":
136139

OceanDataStore/object_store_handler.py

Lines changed: 90 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import glob
1515
import time
1616
import logging
17-
import asyncio
1817
import warnings
1918
from typing import Optional
2019

@@ -188,8 +187,6 @@ async def _check_zarr_compatibility(data: xr.DataArray | xr.Dataset,
188187
await _close_session(obj_store=obj_store)
189188
raise ChunkSizeError(chunks=rechunk, store_chunks=ds_store.chunks)
190189

191-
await _close_session(obj_store=obj_store)
192-
193190

194191
def _check_icechunk_compatibility(data: xr.DataArray | xr.Dataset,
195192
dest: str,
@@ -311,8 +308,6 @@ async def _write_to_zarr(data: xr.DataArray | xr.Dataset,
311308
warnings.simplefilter(action="ignore", category=UserWarning)
312309
data.to_zarr(store=store, mode="w", zarr_format=version)
313310

314-
await _close_session(obj_store=obj_store)
315-
316311

317312
def _write_to_icechunk(data: xr.DataArray | xr.Dataset,
318313
dest: str,
@@ -384,8 +379,6 @@ async def _append_to_zarr(data: xr.DataArray | xr.Dataset,
384379
warnings.simplefilter(action="ignore", category=UserWarning)
385380
data.to_zarr(store=store, append_dim=append_dim, zarr_format=version)
386381

387-
await _close_session(obj_store=obj_store)
388-
389382

390383
def _append_to_icechunk(data: xr.DataArray | xr.Dataset,
391384
dest: str,
@@ -462,8 +455,6 @@ async def _replace_in_zarr(data: xr.DataArray | xr.Dataset,
462455
warnings.simplefilter(action="ignore", category=UserWarning)
463456
data.to_zarr(store=store, region=region, zarr_format=version)
464457

465-
await _close_session(obj_store=obj_store)
466-
467458

468459
def _replace_in_icechunk(data: xr.DataArray | xr.Dataset,
469460
dest: str,
@@ -909,7 +900,7 @@ def _preprocess_dataset(file: list[str] | str | xr.Dataset,
909900
return ds_filepath
910901

911902

912-
def _send_to_zarr(
903+
async def _send_to_zarr(
913904
file: list[str] | str | xr.Dataset,
914905
bucket: str,
915906
object_prefix: str,
@@ -984,23 +975,24 @@ def _send_to_zarr(
984975
# Write to Zarr store:
985976
dest = f"{bucket}/{object_prefix}"
986977
logging.info(f"Sending Dataset to {dest}")
987-
asyncio.run(
988-
_write_to_zarr(data=ds_filepath[variables],
989-
obj_store=obj_store,
990-
dest=dest,
991-
version=zarr_version
992-
)
993-
)
978+
await _write_to_zarr(data=ds_filepath[variables],
979+
obj_store=obj_store,
980+
dest=dest,
981+
version=zarr_version
982+
)
994983

995984
# Shutdown Object Store session on all Dask workers:
996985
if client is not None:
997-
client.run(_close_session, (obj_store), wait=True)
998-
999-
# Release resources to avoid memory leaks:
1000-
ds_filepath.close()
986+
# Release resources to avoid memory leaks:
987+
ds_filepath.close()
988+
client.run(_close_session, obj_store, wait=True)
989+
else:
990+
# Release resources to avoid memory leaks:
991+
ds_filepath.close()
992+
await _close_session(obj_store=obj_store)
1001993

1002994

1003-
def send_to_zarr(
995+
async def send_to_zarr(
1004996
file: list[str] | str | xr.Dataset,
1005997
bucket: str,
1006998
object_prefix: str,
@@ -1063,42 +1055,42 @@ def send_to_zarr(
10631055
# Catch UserWarnings when rechunking data:
10641056
client.register_worker_plugin(CaptureWarningsPlugin())
10651057

1066-
_send_to_zarr(file=file,
1067-
bucket=bucket,
1068-
object_prefix=object_prefix,
1069-
store_credentials_json=store_credentials_json,
1070-
client=client,
1071-
variables=variables,
1072-
append_dim=append_dim,
1073-
grid_filepath=grid_filepath,
1074-
update_coords=update_coords,
1075-
rechunk=rechunk,
1076-
attrs=attrs,
1077-
parallel=True,
1078-
zarr_version=zarr_version
1079-
)
1058+
await _send_to_zarr(file=file,
1059+
bucket=bucket,
1060+
object_prefix=object_prefix,
1061+
store_credentials_json=store_credentials_json,
1062+
client=client,
1063+
variables=variables,
1064+
append_dim=append_dim,
1065+
grid_filepath=grid_filepath,
1066+
update_coords=update_coords,
1067+
rechunk=rechunk,
1068+
attrs=attrs,
1069+
parallel=True,
1070+
zarr_version=zarr_version
1071+
)
10801072

10811073
# --- Shutdown Store & Dask Cluster --- #
1074+
cluster.close()
10821075
client.shutdown()
1083-
client.close()
10841076
logging.info("Dask Cluster has been shutdown.")
10851077

10861078
else:
10871079
# === Send to Zarr store without Dask === #
1088-
_send_to_zarr(file=file,
1089-
bucket=bucket,
1090-
object_prefix=object_prefix,
1091-
store_credentials_json=store_credentials_json,
1092-
client=None,
1093-
variables=variables,
1094-
append_dim=append_dim,
1095-
grid_filepath=grid_filepath,
1096-
update_coords=update_coords,
1097-
rechunk=rechunk,
1098-
attrs=attrs,
1099-
parallel=False,
1100-
zarr_version=zarr_version
1101-
)
1080+
await _send_to_zarr(file=file,
1081+
bucket=bucket,
1082+
object_prefix=object_prefix,
1083+
store_credentials_json=store_credentials_json,
1084+
client=None,
1085+
variables=variables,
1086+
append_dim=append_dim,
1087+
grid_filepath=grid_filepath,
1088+
update_coords=update_coords,
1089+
rechunk=rechunk,
1090+
attrs=attrs,
1091+
parallel=False,
1092+
zarr_version=zarr_version
1093+
)
11021094

11031095

11041096
def _send_to_icechunk(
@@ -1328,8 +1320,8 @@ def send_to_icechunk(
13281320
)
13291321

13301322
# --- Shutdown Store & Dask Cluster --- #
1323+
cluster.close()
13311324
client.shutdown()
1332-
client.close()
13331325
logging.info("Dask Cluster has been shutdown.")
13341326

13351327
else:
@@ -1352,7 +1344,7 @@ def send_to_icechunk(
13521344
)
13531345

13541346

1355-
def _update_zarr(
1347+
async def _update_zarr(
13561348
file: list[str] | str | xr.Dataset,
13571349
bucket: str,
13581350
object_prefix: str,
@@ -1431,25 +1423,26 @@ def _update_zarr(
14311423
# Write to Zarr store:
14321424
dest = f"{bucket}/{object_prefix}"
14331425
logging.info(f"Updating Dataset at {dest}")
1434-
asyncio.run(
1435-
_update_zarr_store(data=ds_filepath[variables],
1436-
obj_store=obj_store,
1437-
dest=dest,
1438-
append_dim=append_dim,
1439-
rechunk=rechunk,
1440-
version=zarr_version
1441-
)
1442-
)
1426+
await _update_zarr_store(data=ds_filepath[variables],
1427+
obj_store=obj_store,
1428+
dest=dest,
1429+
append_dim=append_dim,
1430+
rechunk=rechunk,
1431+
version=zarr_version
1432+
)
14431433

14441434
# Shutdown Object Store session on all Dask workers:
14451435
if client is not None:
1446-
client.run(_close_session, (obj_store), wait=True)
1447-
1448-
# Release resources to avoid memory leaks:
1449-
ds_filepath.close()
1436+
# Release resources to avoid memory leaks:
1437+
ds_filepath.close()
1438+
client.run(_close_session, obj_store, wait=True)
1439+
else:
1440+
await _close_session(obj_store=obj_store)
1441+
# Release resources to avoid memory leaks:
1442+
ds_filepath.close()
14501443

14511444

1452-
def update_zarr(
1445+
async def update_zarr(
14531446
file: list[str] | str | xr.Dataset,
14541447
bucket: str,
14551448
object_prefix: str,
@@ -1513,42 +1506,42 @@ def update_zarr(
15131506
# Catch UserWarnings when rechunking data:
15141507
client.register_worker_plugin(CaptureWarningsPlugin())
15151508

1516-
_update_zarr(file=file,
1517-
bucket=bucket,
1518-
object_prefix=object_prefix,
1519-
store_credentials_json=store_credentials_json,
1520-
client=client,
1521-
variables=variables,
1522-
append_dim=append_dim,
1523-
grid_filepath=grid_filepath,
1524-
update_coords=update_coords,
1525-
rechunk=rechunk,
1526-
attrs=attrs,
1527-
parallel=True,
1528-
zarr_version=zarr_version
1529-
)
1509+
await _update_zarr(file=file,
1510+
bucket=bucket,
1511+
object_prefix=object_prefix,
1512+
store_credentials_json=store_credentials_json,
1513+
client=client,
1514+
variables=variables,
1515+
append_dim=append_dim,
1516+
grid_filepath=grid_filepath,
1517+
update_coords=update_coords,
1518+
rechunk=rechunk,
1519+
attrs=attrs,
1520+
parallel=True,
1521+
zarr_version=zarr_version
1522+
)
15301523

15311524
# --- Shutdown Store & Dask Cluster --- #
1525+
cluster.close()
15321526
client.shutdown()
1533-
client.close()
15341527
logging.info("Dask Cluster has been shutdown.")
15351528

15361529
else:
15371530
# === Update Zarr store without Dask === #
1538-
_update_zarr(file=file,
1539-
bucket=bucket,
1540-
object_prefix=object_prefix,
1541-
store_credentials_json=store_credentials_json,
1542-
client=None,
1543-
variables=variables,
1544-
append_dim=append_dim,
1545-
grid_filepath=grid_filepath,
1546-
update_coords=update_coords,
1547-
rechunk=rechunk,
1548-
attrs=attrs,
1549-
parallel=False,
1550-
zarr_version=zarr_version
1551-
)
1531+
await _update_zarr(file=file,
1532+
bucket=bucket,
1533+
object_prefix=object_prefix,
1534+
store_credentials_json=store_credentials_json,
1535+
client=None,
1536+
variables=variables,
1537+
append_dim=append_dim,
1538+
grid_filepath=grid_filepath,
1539+
update_coords=update_coords,
1540+
rechunk=rechunk,
1541+
attrs=attrs,
1542+
parallel=False,
1543+
zarr_version=zarr_version
1544+
)
15521545

15531546

15541547
def _update_icechunk(
@@ -1747,8 +1740,8 @@ def update_icechunk(
17471740
)
17481741

17491742
# --- Shutdown Store & Dask Cluster --- #
1743+
cluster.close()
17501744
client.shutdown()
1751-
client.close()
17521745
logging.info("Dask Cluster has been shutdown.")
17531746

17541747
else:

0 commit comments

Comments
 (0)