Skip to content

Commit e48a17f

Browse files
authored
Merge pull request #176 from PanDAWMS/next
3.10.5.57
2 parents 18498c7 + 791522f commit e48a17f

34 files changed

Lines changed: 3003 additions & 195 deletions

PILOTVERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
3.10.4.12
1+
3.10.5.57

pilot.py

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,15 +74,17 @@
7474
get_panda_server,
7575
https_setup,
7676
send_update,
77-
update_local_oidc_token_info
77+
update_local_oidc_token_info,
78+
get_memory_limits
7879
)
7980
from pilot.util.loggingsupport import establish_logging
8081
from pilot.util.networking import dump_ipv6_info
8182
from pilot.util.processgroups import find_defunct_subprocesses
8283
from pilot.util.timing import add_to_pilot_timing
8384
from pilot.util.workernode import (
8485
get_node_name,
85-
get_workernode_map
86+
get_workernode_map,
87+
get_workernode_gpu_map
8688
)
8789

8890
errors = ErrorCodes()
@@ -92,7 +94,7 @@
9294
trace = None
9395

9496

95-
def main() -> int:
97+
def main() -> int: # noqa: C901
9698
"""
9799
Prepare for and execute the requested workflow.
98100
@@ -129,11 +131,12 @@ def main() -> int:
129131
"started", args.queue, args.url, args.port, logger, "IPv6"
130132
) # note: assuming IPv6, fallback in place
131133

132-
# check cvmfs if available
133-
ec = check_cvmfs(logger)
134-
if ec:
135-
cvmfs_diagnostics()
136-
return ec
134+
# check cvmfs if available (skip test if either NO_CVMFS_OK env var is set or pilot option --nocvmfs is used)
135+
if args.cvmfs:
136+
ec = check_cvmfs(logger)
137+
if ec:
138+
cvmfs_diagnostics()
139+
return ec
137140

138141
if not args.rucio_host:
139142
args.rucio_host = config.Rucio.host
@@ -142,6 +145,7 @@ def main() -> int:
142145
try:
143146
infosys.init(args.queue)
144147
pilot_cache.queuedata = infosys.queuedata
148+
pilot_cache.harvester_submitmode = args.harvester_submitmode.lower()
145149

146150
# check if queue is ACTIVE
147151
if infosys.queuedata.state != "ACTIVE":
@@ -165,6 +169,14 @@ def main() -> int:
165169
send_workernode_map(infosys.queuedata.site, args.url, args.port, "IPv6", logger) # note: assuming IPv6, fallback in place
166170
except Exception as error:
167171
logger.warning(f"exception caught when sending workernode map: {error}")
172+
try:
173+
memory_limits = get_memory_limits(args.url, args.port)
174+
except Exception as error:
175+
logger.warning(f"exception caught when getting resource types: {error}")
176+
else:
177+
logger.debug(f"resource types: {memory_limits}")
178+
if memory_limits:
179+
pilot_cache.resource_types = memory_limits
168180

169181
# handle special CRIC variables via params
170182
# internet protocol versions 'IPv4' or 'IPv6' can be set via CRIC PQ.params.internet_protocol_version
@@ -360,6 +372,16 @@ def get_args() -> Any:
360372
help="Pilot leasetime seconds (default: 3600 s)",
361373
)
362374

375+
# Disabe cvmfs checks
376+
arg_parser.add_argument(
377+
"-b",
378+
"--nocvmfs",
379+
dest="cvmfs",
380+
action="store_false",
381+
default=True,
382+
help="Disable cvmfs checks",
383+
)
384+
363385
# set the appropriate site, resource and queue
364386
arg_parser.add_argument(
365387
"-q",
@@ -948,7 +970,7 @@ def send_workernode_map(
948970
logger: Any,
949971
):
950972
"""
951-
Send worker node map to the server.
973+
Send worker node map and GPU info to the server.
952974
953975
:param site: ATLAS site name (str)
954976
:param url: server url (str)
@@ -961,8 +983,21 @@ def send_workernode_map(
961983
data = get_workernode_map(site)
962984
except Exception as e:
963985
logger.warning(f"exception caught when calling get_workernode_map(): {e}")
964-
else:
986+
try:
965987
send_update("api/v1/pilot/update_worker_node", data, url, port, ipv=internet_protocol_version, max_attempts=1)
988+
except Exception as e:
989+
logger.warning(f"exception caught when sending worker node map to server: {e}")
990+
991+
# GPU info
992+
try:
993+
data = get_workernode_gpu_map(site)
994+
except Exception as e:
995+
logger.warning(f"exception caught when calling get_workernode_gpu_map(): {e}")
996+
try:
997+
if data: # only send if data is not empty
998+
send_update("api/v1/pilot/update_worker_node_gpu", data, url, port, ipv=internet_protocol_version, max_attempts=1)
999+
except Exception as e:
1000+
logger.warning(f"exception caught when sending worker node map to server: {e}")
9661001

9671002

9681003
def set_lifetime():

pilot/common/pilotcache.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ def __init__(self):
4040
self.pilot_home_dir = None
4141
self.current_job_id = None
4242
self.current_job_state = None
43+
self.source_site = None
44+
self.destination_site = None
45+
self.resource_types = None
46+
self.harvester_submitmode = None
4347

4448
def get_pids(self):
4549
"""
@@ -57,7 +61,9 @@ def add_cgroup(self, key, value):
5761
Normally, the process id would be used as the key, and a
5862
typical value will be the path to the cgroup.
5963
60-
This is used to keep track of the cgroups for each process.
64+
The key value can also be a string that identifies a group of processes,
65+
such as "subprocesses". This allows for grouping processes under a
66+
common identifier, which can be useful for monitoring or management purposes.
6167
6268
Args:
6369
key (str): Key for the cgroups entry.

pilot/control/data.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -861,7 +861,11 @@ def copy_special_files(tardir: str):
861861
# store the workernode map
862862
try:
863863
path = os.path.join(pilot_home, config.Workernode.map)
864-
copy(path, tardir)
864+
if os.path.exists(path):
865+
copy(path, tardir)
866+
path = os.path.join(pilot_home, config.Workernode.gpu_map)
867+
if os.path.exists(path):
868+
copy(path, tardir)
865869
except (NoSuchFile, FileHandlingFailure) as exc:
866870
logger.warning(f'failed to copy workernode map: {exc}')
867871

pilot/control/job.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@
144144
get_cpu_model,
145145
get_disk_space,
146146
get_node_name,
147-
update_modelstring
147+
update_modelstring,
148+
extract_site_and_schedd
148149
)
149150

150151
errors = ErrorCodes()
@@ -834,6 +835,13 @@ def get_data_structure(job: Any, state: str, args: Any, xml: str = "", metadata:
834835
add_timing_and_extracts(data, job, state, args)
835836
https.add_error_codes(data, job)
836837

838+
# glidein information, currently only relevant for EIC and generic pilots
839+
if args.pilot_user.lower() == 'eic' or args.pilot_user.lower() == 'generic':
840+
glidein_site, remote_schedd_name = extract_site_and_schedd()
841+
if glidein_site and remote_schedd_name:
842+
data['source_site'] = remote_schedd_name
843+
data['destination_site'] = glidein_site
844+
837845
return data
838846

839847

pilot/control/monitor.py

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
"""Functions for monitoring of pilot and threads."""
2727

2828
import logging
29+
import os
2930
import threading
3031
import time
3132
import re
@@ -39,10 +40,12 @@
3940
from typing import Any
4041

4142
from pilot.common.exception import PilotException, ExceededMaxWaitTime
43+
from pilot.common.pilotcache import get_pilot_cache
4244
from pilot.util.auxiliary import (
4345
check_for_final_server_update,
4446
set_pilot_state
4547
)
48+
from pilot.util.cgroups import monitor_cgroup
4649
from pilot.util.common import is_pilot_check
4750
from pilot.util.config import config
4851
from pilot.util.constants import MAX_KILL_WAIT_TIME
@@ -61,9 +64,41 @@
6164
)
6265
from pilot.util.timing import get_time_since_start
6366

67+
pilot_cache = get_pilot_cache()
6468
logger = logging.getLogger(__name__)
6569

6670

71+
def cgroup_control(queues: namedtuple, traces: Any, args: object): # noqa: C901
72+
"""
73+
Control function for the cgroup monitor.
74+
75+
This function is called from the main control thread to set up the cgroup monitor task.
76+
77+
Args:
78+
queues: internal queues for job handling (namedtuple)
79+
traces: tuple containing internal pilot states (Any)
80+
args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc) (object)
81+
"""
82+
if queues or traces: # to bypass pylint warning
83+
pass
84+
85+
# set up the periodic cgroup monitor task
86+
while not args.graceful_stop.is_set():
87+
pilot_cgroup_path = pilot_cache.get_cgroup(os.getpid())
88+
logger.debug(f"monitoring pilot cgroup at path: {pilot_cgroup_path}")
89+
if pilot_cgroup_path:
90+
monitor_cgroup(pilot_cgroup_path)
91+
92+
subprocesses_cgroup_path = pilot_cache.get_cgroup('subprocesses')
93+
logger.debug(f"monitoring subprocesses cgroup at path: {subprocesses_cgroup_path}")
94+
if subprocesses_cgroup_path:
95+
monitor_cgroup(subprocesses_cgroup_path)
96+
97+
time.sleep(60)
98+
99+
logger.info("[monitor] cgroup control has ended")
100+
101+
67102
def control(queues: namedtuple, traces: Any, args: object): # noqa: C901
68103
"""
69104
Monitor threads.
@@ -89,9 +124,12 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901
89124
tcpu = t_0
90125
last_minute_check = t_0
91126

92-
queuedata = get_queuedata_from_job(queues)
127+
queuedata = pilot_cache.queuedata
128+
if not queuedata:
129+
logger.warning("no queuedata in pilot cache, will try to extract it from queues")
130+
queuedata = get_queuedata_from_job(queues)
93131
if not queuedata:
94-
logger.warning('queuedata could not be extracted from queues')
132+
logger.warning('queuedata could not be extracted from queues either')
95133

96134
try:
97135
# overall loop counter (ignoring the fact that more than one job may be running)
@@ -137,6 +175,7 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901
137175
logger.info(f'using max running time = {max_running_time}s')
138176

139177
# if start_time for the current job is known (push queues), a more detailed check can be performed
178+
start_time_ok = False
140179
if start_time and queuedata: # in epoch seconds
141180
time_since_job_start = int(time.time()) - start_time
142181
# in this case, max_running_time is the max job walltime
@@ -147,11 +186,12 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901
147186
reached_maxtime_abort(args)
148187
break
149188
else:
150-
logger.debug(f'time since job start ({time_since_job_start}s) is within the limit ({limit}s)')
189+
logger.info(f'time since job start ({time_since_job_start}s) is within the limit ({limit}s)')
151190
logger.debug(f'max running time = {max_running_time}s, queuedata.pilot_walltime_grace = {queuedata.pilot_walltime_grace}')
191+
start_time_ok = True
152192

153193
# fallback to max_running_time if start_time is not known
154-
if time_since_start > max_running_time - grace_time:
194+
if (time_since_start > max_running_time - grace_time) and not start_time_ok:
155195
logger.fatal(f'max running time ({max_running_time}s) minus grace time ({grace_time}s) has been '
156196
f'exceeded - time to abort pilot')
157197
reached_maxtime_abort(args)
@@ -206,6 +246,10 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901
206246
print((f"monitor: exception caught: {error}"))
207247
raise PilotException(error) from error
208248

249+
# shut down the cgroups monitoring task
250+
# logger.info("[monitor] waiting for cgroup monitor task to finish")
251+
# await task
252+
209253
logger.info('[monitor] control thread has ended')
210254

211255

pilot/control/payloads/generic.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,14 @@
3434
from typing import Any, TextIO
3535

3636
from pilot.common.errorcodes import ErrorCodes
37+
from pilot.common.pilotcache import get_pilot_cache
3738
from pilot.control.job import send_state
3839
from pilot.info import JobData
3940
from pilot.util.auxiliary import (
4041
set_pilot_state, # , show_memory_usage
4142
list_items
4243
)
44+
from pilot.util.cgroups import move_process_and_descendants_to_cgroup
4345
from pilot.util.config import config
4446
from pilot.util.container import execute
4547
from pilot.util.constants import (
@@ -72,6 +74,7 @@
7274

7375
logger = logging.getLogger(__name__)
7476
errors = ErrorCodes()
77+
pilot_cache = get_pilot_cache()
7578

7679

7780
class Executor:
@@ -612,8 +615,21 @@ def run_payload(self, job: JobData, cmd: str, out: Any, err: Any) -> Any:
612615
job.pgrp = os.getpgid(job.pid)
613616
set_pilot_state(job=job, state="running")
614617

615-
# _cmd = self.utility_with_payload(job)
618+
# move the payload process to the cgroup if cgroups are used
619+
try:
620+
if pilot_cache.use_cgroups:
621+
cgroup_path = pilot_cache.get_cgroup("subprocesses")
622+
if cgroup_path:
623+
logger.info(
624+
f"moving process (pid={job.pid}) to cgroup: {cgroup_path}"
625+
)
626+
_ = move_process_and_descendants_to_cgroup(cgroup_path, job.pid)
627+
else:
628+
logger.warning("cannot move process to cgroup - no cgroup path found")
629+
except Exception as e:
630+
logger.warning(f"failed to move process to cgroup: {e}")
616631

632+
# _cmd = self.utility_with_payload(job)
617633
self.utility_after_payload_started(job)
618634

619635
return proc

0 commit comments

Comments
 (0)