Skip to content

Commit d9a2ca4

Browse files
authored
Merge pull request #168 from PalNilsson/next
3.10.3.82
2 parents 6893824 + bf8da75 commit d9a2ca4

52 files changed

Lines changed: 4565 additions & 484 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

PILOTVERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
3.10.2.2
1+
3.10.3.82

pilot.py

Lines changed: 62 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
# Authors:
2020
# - Mario Lassnig, mario.lassnig@cern.ch, 2016-17
2121
# - Daniel Drizhuk, d.drizhuk@gmail.com, 2017
22-
# - Paul Nilsson, paul.nilsson@cern.ch, 2017-24
22+
# - Paul Nilsson, paul.nilsson@cern.ch, 2017-25
2323

2424
"""This is the entry point for the PanDA Pilot, executed with 'python3 pilot.py <args>'."""
2525

@@ -37,12 +37,15 @@
3737

3838
from pilot.common.errorcodes import ErrorCodes
3939
from pilot.common.exception import PilotException
40+
from pilot.common.pilotcache import get_pilot_cache
4041
from pilot.info import infosys
4142
from pilot.util.auxiliary import (
4243
convert_signal_to_exit_code,
4344
pilot_version_banner,
4445
shell_exit_code,
4546
)
47+
from pilot.util.batchsystem import is_htcondor_version_sufficient
48+
# from pilot.util.cgroups import create_cgroup
4649
from pilot.util.config import config
4750
from pilot.util.constants import (
4851
get_pilot_version,
@@ -61,7 +64,6 @@
6164
from pilot.util.filehandling import (
6265
get_pilot_work_dir,
6366
mkdirs,
64-
store_base_urls
6567
)
6668
from pilot.util.harvester import (
6769
is_harvester_mode,
@@ -78,9 +80,13 @@
7880
from pilot.util.networking import dump_ipv6_info
7981
from pilot.util.processgroups import find_defunct_subprocesses
8082
from pilot.util.timing import add_to_pilot_timing
81-
from pilot.util.workernode import get_node_name
83+
from pilot.util.workernode import (
84+
get_node_name,
85+
get_workernode_map
86+
)
8287

8388
errors = ErrorCodes()
89+
pilot_cache = get_pilot_cache()
8490
mainworkdir = ""
8591
args = None
8692
trace = None
@@ -135,6 +141,8 @@ def main() -> int:
135141
# initialize InfoService
136142
try:
137143
infosys.init(args.queue)
144+
pilot_cache.queuedata = infosys.queuedata
145+
138146
# check if queue is ACTIVE
139147
if infosys.queuedata.state != "ACTIVE":
140148
logger.critical(
@@ -152,11 +160,11 @@ def main() -> int:
152160
update_local_oidc_token_info(args.url, args.port)
153161

154162
# create and report the worker node map
155-
# site = infosys.queuedata.resource
156-
#if args.update_server and args.workerpilotstatusupdate:
157-
# send_worker_status(
158-
# "started", args.queue, args.url, args.port, logger, "IPv6"
159-
# ) # note: assuming IPv6, fallback in place
163+
if args.update_server and args.pilot_user.lower() == "atlas": # only send info for atlas for now
164+
try:
165+
send_workernode_map(infosys.queuedata.site, args.url, args.port, "IPv6", logger) # note: assuming IPv6, fallback in place
166+
except Exception as error:
167+
logger.warning(f"exception caught when sending workernode map: {error}")
160168

161169
# handle special CRIC variables via params
162170
# internet protocol versions 'IPv4' or 'IPv6' can be set via CRIC PQ.params.internet_protocol_version
@@ -498,10 +506,18 @@ def get_args() -> Any:
498506
help="PanDA server URL",
499507
)
500508
arg_parser.add_argument(
501-
"-p", "--port", dest="port", default=25443, help="PanDA server port"
509+
"-p",
510+
"--port",
511+
dest="port",
512+
type=int,
513+
default=25443,
514+
help="PanDA server port"
502515
)
503516
arg_parser.add_argument(
504-
"--queuedata-url", dest="queuedata_url", default="", help="Queuedata server URL"
517+
"--queuedata-url",
518+
dest="queuedata_url",
519+
default="",
520+
help="Queuedata server URL"
505521
)
506522
arg_parser.add_argument(
507523
"--storagedata-url",
@@ -738,22 +754,27 @@ def set_environment_variables():
738754
"""
739755
# working directory as set with a pilot option (e.g. ..)
740756
environ["PILOT_WORK_DIR"] = args.workdir # TODO: replace with singleton
757+
pilot_cache.pilot_work_dir = args.workdir
741758

742759
# main work directory (e.g. /scratch/PanDA_Pilot3_3908_1537173670)
743760
environ["PILOT_HOME"] = mainworkdir # TODO: replace with singleton
761+
pilot_cache.pilot_home_dir = mainworkdir
744762

745763
# pilot source directory (e.g. /cluster/home/usatlas1/gram_scratch_hHq4Ns/condorg_oqmHdWxz)
746764
if not environ.get("PILOT_SOURCE_DIR", None):
747765
environ["PILOT_SOURCE_DIR"] = args.sourcedir # TODO: replace with singleton
766+
pilot_cache.pilot_source_dir = args.sourcedir
748767

749768
# set the pilot user (e.g. ATLAS)
750769
environ["PILOT_USER"] = args.pilot_user # TODO: replace with singleton
751770

752771
# internal pilot state
753772
environ["PILOT_JOB_STATE"] = "startup" # TODO: replace with singleton
773+
pilot_cache.pilot_job_state = "startup"
754774

755775
# set the pilot version
756776
environ["PILOT_VERSION"] = get_pilot_version()
777+
pilot_cache.pilot_version = get_pilot_version()
757778

758779
# set the default wrap-up/finish instruction
759780
environ["PILOT_WRAP_UP"] = "NORMAL"
@@ -785,6 +806,14 @@ def set_environment_variables():
785806
if args.storagedata_url:
786807
environ["STORAGEDATA_SERVER_URL"] = f"{args.storagedata_url}"
787808

809+
# should cgroups be used for process management?
810+
pilot_cache.use_cgroups = is_htcondor_version_sufficient() if args.pilot_user.lower() == 'atlas' else False
811+
812+
# create a cgroup for the pilot
813+
if pilot_cache.use_cgroups:
814+
pass
815+
# _ = create_cgroup()
816+
788817

789818
def wrap_up() -> int:
790819
"""
@@ -879,7 +908,7 @@ def send_worker_status(
879908
status: str,
880909
queue: str,
881910
url: str,
882-
port: str,
911+
port: int,
883912
logger: Any,
884913
internet_protocol_version: str,
885914
):
@@ -888,12 +917,12 @@ def send_worker_status(
888917
889918
Note: the function can fail, but if it does, it will be ignored.
890919
891-
:param status: 'started' or 'finished' (string).
892-
:param queue: PanDA queue name (string).
893-
:param url: server url (string).
894-
:param port: server port (string).
895-
:param logger: logging object.
896-
:param internet_protocol_version: internet protocol version, IPv4 or IPv6 (string).
920+
:param status: 'started' or 'finished' (str)
921+
:param queue: PanDA queue name (str)
922+
:param url: server url (str)
923+
:param port: server port (int)
924+
:param logger: logging object (object)
925+
:param internet_protocol_version: internet protocol version, IPv4 or IPv6 (str).
897926
"""
898927
# worker node structure to be sent to the server
899928
data = {}
@@ -906,35 +935,35 @@ def send_worker_status(
906935
# attempt to send the worker info to the server
907936
if data["workerID"] and data["harvesterID"]:
908937
send_update(
909-
"updateWorkerPilotStatus", data, url, port, ipv=internet_protocol_version
938+
"updateWorkerPilotStatus", data, url, port, ipv=internet_protocol_version, max_attempts=2
910939
)
911940
else:
912-
logger.warning(
913-
"workerID/harvesterID not known, will not send worker status to server"
914-
)
941+
logger.warning("workerID/harvesterID not known, will not send worker status to server")
915942

916943

917944
def send_workernode_map(
918-
site: str,
919-
url: str,
920-
port: str,
921-
internet_protocol_version: str,
945+
site: str,
946+
url: str,
947+
port: int,
948+
internet_protocol_version: str,
949+
logger: Any,
922950
):
923951
"""
924952
Send worker node map to the server.
925953
926954
:param site: ATLAS site name (str)
927955
:param url: server url (str)
928-
:param port: server port (str)
929-
:param internet_protocol_version: internet protocol version, IPv4 or IPv6 (str).
956+
:param port: server port (int)
957+
:param internet_protocol_version: internet protocol version, IPv4 or IPv6 (str)
958+
:param logger: logging object (object).
930959
"""
931960
# worker node structure to be sent to the server
932-
data = {}
933-
934-
# attempt to send the worker info to the server
935-
send_update(
936-
"pilot/update_worker_node", data, url, port, ipv=internet_protocol_version
937-
)
961+
try:
962+
data = get_workernode_map(site)
963+
except Exception as e:
964+
logger.warning(f"exception caught: {e}")
965+
else:
966+
send_update("api/v1/pilot/update_worker_node", data, url, port, ipv=internet_protocol_version, max_attempts=1)
938967

939968

940969
def set_lifetime():
@@ -1019,10 +1048,6 @@ def list_zombies():
10191048
# set environment variables (to be replaced with singleton implementation)
10201049
set_environment_variables()
10211050

1022-
# store base URLs in a file if set
1023-
if args.baseurls:
1024-
store_base_urls(args.baseurls)
1025-
10261051
# execute main function
10271052
trace = main()
10281053

pilot/common/errorcodes.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ class ErrorCodes:
185185
ARCPROXYLIBFAILURE = 1381
186186
PROXYTOOSHORT = 1382 # used at the beginning of the pilot to indicate that the proxy is too short
187187
STAGEOUTAUTHENTICATIONFAILURE = 1383
188+
QUEUENOTSETUPFORCONTAINERS = 1384
188189

189190
_error_messages = {
190191
GENERALERROR: "General pilot error, consult batch log",
@@ -331,6 +332,7 @@ class ErrorCodes:
331332
ARCPROXYLIBFAILURE: "Arcproxy failure while loading shared libraries",
332333
PROXYTOOSHORT: "Proxy is too short",
333334
STAGEOUTAUTHENTICATIONFAILURE: "Authentication failure during stage-out",
335+
QUEUENOTSETUPFORCONTAINERS: "Queue is not set up for containers",
334336

335337
}
336338

@@ -605,7 +607,12 @@ def get_error_name(cls, code: int) -> str:
605607

606608
@classmethod
607609
def generate_json(cls, filename: str = "error_codes.json"):
608-
"""Generate a JSON object containing the error codes and diagnostics."""
610+
"""
611+
Generate a JSON object containing the error codes and diagnostics.
612+
613+
Args:
614+
str filename: The name of the JSON file to save the error codes and diagnostics.
615+
"""
609616
error_dict = {}
610617
for error_code, message in cls._error_messages.items():
611618
error_name = cls.get_error_name(error_code)
@@ -614,6 +621,22 @@ def generate_json(cls, filename: str = "error_codes.json"):
614621
with open(filename, "w", encoding='utf-8') as f:
615622
dump(error_dict, f, indent=4)
616623

624+
@classmethod
625+
def convert_acronym_to_code(cls, filename: str = "acronyms.json"):
626+
"""
627+
Convert the acronyms in the ErrorCode class and store them in a JSON with the error codes as values.
628+
629+
Args:
630+
str filename: The name of the JSON file to save the acronyms and error codes.
631+
"""
632+
error_codes = {}
633+
for error_code, _ in cls._error_messages.items():
634+
error_name = cls.get_error_name(error_code)
635+
error_codes[error_name] = error_code
636+
637+
with open(filename, "w", encoding='utf-8') as f:
638+
dump(error_codes, f, indent=4)
639+
617640
@classmethod
618641
def is_recoverable(cls, code: int = 0) -> bool:
619642
"""

pilot/common/pilotcache.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
#!/usr/bin/env python
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
#
19+
# Authors:
20+
# - Paul Nilsson, paul.nilsson@cern.ch, 2025
21+
22+
"""Persistent memory cache for data structures used by the pilot."""
23+
24+
from functools import lru_cache
25+
26+
27+
@lru_cache(maxsize=1)
28+
def get_pilot_cache():
29+
""" Get the dedicated memory cache for the pilot. """
30+
class PilotCache:
31+
def __init__(self):
32+
""" Define standard initialization for the cache. """
33+
self.use_cgroups = None # for process management
34+
self.cgroups = {} # for process management
35+
self.proxy_lifetime = 0
36+
self.queuedata = {}
37+
self.pilot_version = None
38+
self.pilot_work_dir = None
39+
self.pilot_source_dir = None
40+
self.pilot_home_dir = None
41+
self.current_job_id = None
42+
self.current_job_state = None
43+
44+
def get_pids(self):
45+
"""
46+
Get the list of process IDs (PIDs) from the cgroups dictionary.
47+
48+
Returns:
49+
list: List of PIDs.
50+
"""
51+
return list(self.cgroups.keys())
52+
53+
def add_cgroup(self, key, value):
54+
"""
55+
Add an entry to the cgroups dictionary.
56+
57+
Normally, the process id would be used as the key, and a
58+
typical value will be the path to the cgroup.
59+
60+
This is used to keep track of the cgroups for each process.
61+
62+
Args:
63+
key (str): Key for the cgroups entry.
64+
value: Value for the cgroups entry.
65+
"""
66+
self.cgroups[key] = value
67+
68+
def get_cgroup(self, key, default=None):
69+
"""
70+
Get an entry from the cgroups dictionary.
71+
72+
Args:
73+
key (str): Key for the cgroups entry.
74+
default: Value to return if the key doesn't exist (default: None).
75+
76+
Returns:
77+
The value associated with the key, or default if the key doesn't exist.
78+
"""
79+
return self.cgroups.get(key, default)
80+
81+
return PilotCache()

0 commit comments

Comments
 (0)