Skip to content

Commit 18498c7

Browse files
authored
Merge pull request #174 from PanDAWMS/next
3.10.4.12
2 parents 0070e15 + b071c68 commit 18498c7

26 files changed

Lines changed: 210 additions & 62 deletions

PILOTVERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
3.10.3.82
1+
3.10.4.12

pilot.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
shell_exit_code,
4646
)
4747
from pilot.util.batchsystem import is_htcondor_version_sufficient
48-
# from pilot.util.cgroups import create_cgroup
48+
from pilot.util.cgroups import create_cgroup
4949
from pilot.util.config import config
5050
from pilot.util.constants import (
5151
get_pilot_version,
@@ -811,8 +811,7 @@ def set_environment_variables():
811811

812812
# create a cgroup for the pilot
813813
if pilot_cache.use_cgroups:
814-
pass
815-
# _ = create_cgroup()
814+
_ = create_cgroup()
816815

817816

818817
def wrap_up() -> int:
@@ -961,7 +960,7 @@ def send_workernode_map(
961960
try:
962961
data = get_workernode_map(site)
963962
except Exception as e:
964-
logger.warning(f"exception caught: {e}")
963+
logger.warning(f"exception caught when calling get_workernode_map(): {e}")
965964
else:
966965
send_update("api/v1/pilot/update_worker_node", data, url, port, ipv=internet_protocol_version, max_attempts=1)
967966

pilot/api/data.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1086,8 +1086,8 @@ def prepare_destinations(self, files: list, activities: list or str, alt_exclude
10861086
:param alt_exclude: global list of destinations that should be excluded / not used for alternative stage-out
10871087
:return: updated fspec entries (list).
10881088
"""
1089-
if alt_exclude is None: # to bypass pylint complaint if declared as [] above
1090-
alt_exclude = []
1089+
1090+
alt_exclude = list(alt_exclude or [])
10911091

10921092
if not self.infosys.queuedata: # infosys is not initialized: not able to fix destination if need, nothing to do
10931093
return files
@@ -1120,7 +1120,7 @@ def resolve_alt_destination(primary, exclude=None):
11201120

11211121
cur = storages.index(primary) if primary in storages else 0
11221122
inext = (cur + 1) % len(storages) # cycle storages, take the first elem when reach end
1123-
exclude = set([primary] + list(exclude if exclude is not None else []))
1123+
exclude = set([primary] + list(exclude or []))
11241124
alt = None
11251125
for attempt in range(len(exclude) or 1): # apply several tries to jump exclude entries (in case of dublicated data will stack)
11261126
inext = (cur + 1) % len(storages) # cycle storages, start from the beginning when reach end

pilot/common/errorcodes.py

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ class ErrorCodes:
186186
PROXYTOOSHORT = 1382 # used at the beginning of the pilot to indicate that the proxy is too short
187187
STAGEOUTAUTHENTICATIONFAILURE = 1383
188188
QUEUENOTSETUPFORCONTAINERS = 1384
189+
NOJOBSINPANDA = 1385 # internally used code
189190

190191
_error_messages = {
191192
GENERALERROR: "General pilot error, consult batch log",
@@ -333,7 +334,7 @@ class ErrorCodes:
333334
PROXYTOOSHORT: "Proxy is too short",
334335
STAGEOUTAUTHENTICATIONFAILURE: "Authentication failure during stage-out",
335336
QUEUENOTSETUPFORCONTAINERS: "Queue is not set up for containers",
336-
337+
NOJOBSINPANDA: "No jobs in PanDA",
337338
}
338339

339340
put_error_codes = [1135, 1136, 1137, 1141, 1152, 1181]
@@ -344,11 +345,11 @@ def reset_pilot_errors(self):
344345
ErrorCodes.pilot_error_codes = []
345346
ErrorCodes.pilot_error_diags = []
346347

347-
def get_kill_signal_error_code(self, signal: str) -> int:
348+
def get_kill_signal_error_code(self, signal_name: str) -> int:
348349
"""
349350
Match a kill signal with a corresponding Pilot error code.
350351
351-
:param signal: signal name (str).
352+
:param signal_name: signal name (str).
352353
:return: Pilot error code (int).
353354
"""
354355
signals_dictionary = {
@@ -358,9 +359,10 @@ def get_kill_signal_error_code(self, signal: str) -> int:
358359
"SIGXCPU": self.SIGXCPU,
359360
"SIGUSR1": self.SIGUSR1,
360361
"SIGBUS": self.SIGBUS,
362+
"SIGINT": self.SIGINT,
361363
}
362364

363-
return signals_dictionary.get(signal, self.KILLSIGNAL)
365+
return signals_dictionary.get(signal_name, self.KILLSIGNAL)
364366

365367
def get_error_message(self, errorcode: int) -> str:
366368
"""
@@ -445,13 +447,17 @@ def report_errors(self) -> str:
445447

446448
return report
447449

448-
def resolve_transform_error(self, exit_code: int, stderr: str) -> int:
450+
def resolve_transform_error(self, exit_code: int, stderr: str) -> tuple[int, str]:
449451
"""
450452
Assign a pilot error code to a specific transform error.
451453
452-
:param exit_code: transform exit code (int)
453-
:param stderr: transform stderr (str)
454-
:return: pilot error code (int).
454+
Args:
455+
exit_code (int): Transform exit code.
456+
stderr (str): Transform stderr.
457+
458+
Returns:
459+
int: Pilot error code.
460+
str: Error message if extracted from stderr, otherwise an empty string.
455461
"""
456462
error_map = {
457463
"Not mounting requested bind point": self.SINGULARITYBINDPOINTFAILURE,
@@ -464,28 +470,51 @@ def resolve_transform_error(self, exit_code: int, stderr: str) -> int:
464470
"Apptainer is not installed": self.APPTAINERNOTINSTALLED,
465471
"cannot create directory": self.MKDIR,
466472
"General payload setup verification error": self.SETUPFAILURE,
473+
"No such file or directory": self.NOSUCHFILE,
467474
}
468475

476+
def get_key_by_value(d: dict, value: str) -> str:
477+
"""Return the key corresponding to a given value."""
478+
for k, v in d.items():
479+
if v == value:
480+
return k
481+
return ""
482+
469483
# Check if stderr contains any known error messages
484+
apptainer_codes = {
485+
self.SINGULARITYBINDPOINTFAILURE,
486+
self.SINGULARITYNOLOOPDEVICES,
487+
self.SINGULARITYIMAGEMOUNTFAILURE,
488+
self.SINGULARITYIMAGEMOUNTFAILURE,
489+
self.SINGULARITYGENERALFAILURE,
490+
self.SINGULARITYFAILEDUSERNAMESPACE,
491+
self.SINGULARITYNOTINSTALLED,
492+
self.APPTAINERNOTINSTALLED
493+
}
470494
for error_message, error_code in error_map.items():
471495
if error_message in stderr:
472-
return error_code
496+
# only allow overwriting exit code 0 for specific errors (read: apptainer)
497+
if exit_code == 0 and error_code in apptainer_codes:
498+
return error_code, error_message
499+
else:
500+
continue
473501

474502
# Handle specific exit codes
503+
key = get_key_by_value(error_map, exit_code)
475504
if exit_code == 2:
476-
return self.LSETUPTIMEDOUT
505+
return self.LSETUPTIMEDOUT, key
477506
if exit_code == 3:
478-
return self.REMOTEFILEOPENTIMEDOUT
507+
return self.REMOTEFILEOPENTIMEDOUT, key
479508
if exit_code == 251:
480-
return self.UNKNOWNTRFFAILURE
509+
return self.UNKNOWNTRFFAILURE, key
481510
if exit_code == -1:
482-
return self.UNKNOWNTRFFAILURE
511+
return self.UNKNOWNTRFFAILURE, key
483512
if exit_code == self.COMMANDTIMEDOUT:
484-
return exit_code
513+
return exit_code, key
485514
if exit_code != 0:
486-
return self.PAYLOADEXECUTIONFAILURE
515+
return self.PAYLOADEXECUTIONFAILURE, key
487516

488-
return exit_code # Return original exit code if no specific error is found
517+
return exit_code, key # Return original exit code if no specific error is found
489518

490519
def extract_stderr_error(self, stderr: str) -> str:
491520
"""

pilot/common/pilotcache.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def __init__(self):
3333
self.use_cgroups = None # for process management
3434
self.cgroups = {} # for process management
3535
self.proxy_lifetime = 0
36-
self.queuedata = {}
36+
self.queuedata = None
3737
self.pilot_version = None
3838
self.pilot_work_dir = None
3939
self.pilot_source_dir = None

pilot/control/data.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1114,7 +1114,7 @@ def generate_fileinfo(job: JobData) -> dict:
11141114
f'{checksum_type}': entry.checksum.get(config.File.checksum_type),
11151115
'surl': entry.turl
11161116
}
1117-
if entry.is_altstaged:
1117+
if True or entry.is_altstaged: # always report output RSE (ATLASPANDA-604)
11181118
dat['endpoint'] = entry.ddmendpoint
11191119

11201120
fileinfo[entry.lfn] = dat

pilot/control/job.py

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -807,7 +807,8 @@ def get_data_structure(job: Any, state: str, args: Any, xml: str = "", metadata:
807807
try:
808808
readfrac = data.get('totRBYTES') / _totalsize
809809
except (TypeError, ZeroDivisionError) as exc:
810-
logger.warning(f"failed to calculate {data.get('totRBYTES')}/{_totalsize}: {exc}")
810+
logger.warning(f"failed to calculate totRBYTES / total size of input files = {data.get('totRBYTES')}/{_totalsize}: {exc}")
811+
logger.warning('will not report readbyterate')
811812
readfrac = None
812813
else:
813814
readfrac = float_to_rounded_string(readfrac, precision=2)
@@ -1860,11 +1861,76 @@ def get_job_definition(queues: namedtuple, args: object) -> dict or str:
18601861
if abort:
18611862
res = None # None will trigger 'fatal' error and will finish the pilot
18621863
else:
1863-
res = get_job_definition_from_server(args, taskid=taskid)
1864+
if delay_to_get_job():
1865+
res = None
1866+
else:
1867+
res = get_job_definition_from_server(args, taskid=taskid)
18641868

18651869
return res
18661870

18671871

1872+
def get_load_factor() -> float:
1873+
"""
1874+
Get the pilot load factor.
1875+
"""
1876+
try:
1877+
load_factor = os.environ.get('PILOT_LOAD_FACTOR', None)
1878+
if load_factor:
1879+
try:
1880+
load_factor = float(load_factor)
1881+
except Exception as exc: # NOQA F841
1882+
load_factor = None
1883+
if not load_factor:
1884+
load_factor = float(config.Pilot.load_factor)
1885+
return load_factor
1886+
except Exception as exc:
1887+
logger.warning(f'delay_to_get_job caught exception: {exc}')
1888+
return 1.2
1889+
1890+
1891+
def is_delay_to_get_job_enabled() -> bool:
1892+
"""
1893+
To check if it's enabled to delay to get jobs based on load.
1894+
"""
1895+
try:
1896+
enabled = os.environ.get('PILOT_ENABLE_DELAY_TO_GET_JOB', 'false').lower() == 'true'
1897+
if enabled:
1898+
return enabled
1899+
enabled = bool(config.Pilot.enable_delay_to_get_job)
1900+
return enabled
1901+
except Exception as exc:
1902+
logger.warning(f'is_delay_to_get_job_enabled caught exception: {exc}')
1903+
return False
1904+
1905+
1906+
def delay_to_get_job() -> bool:
1907+
"""
1908+
Delay to get job if the machine load is too high.
1909+
1910+
This code checks the system's load average and compares it to a calculated maximum load based on the number
1911+
of CPU cores and a configurable load factor. If the 1-minute load average exceeds the maximum load, it logs
1912+
the condition and delays job retrieval.
1913+
1914+
:return: bool.
1915+
"""
1916+
try:
1917+
if not is_delay_to_get_job_enabled():
1918+
return False
1919+
1920+
load1, load5, load15 = os.getloadavg()
1921+
num_cores = os.cpu_count()
1922+
load_factor = get_load_factor()
1923+
max_load = num_cores * load_factor
1924+
logger.info(f"num_cores: {num_cores}, load_factor: {load_factor}, load1: {load1}, load5: {load5}, load15: {load15}")
1925+
if load1 > max_load:
1926+
logger.info(f"load1 ({load1}) > max_load ({max_load}) (num_cores * load_factor). delay to getjob.")
1927+
return True
1928+
return False
1929+
except Exception as exc:
1930+
logger.warning(f'delay_to_get_job caught exception: {exc}')
1931+
return False
1932+
1933+
18681934
def get_message_from_mb(args: Any) -> dict:
18691935
"""
18701936
Get a message from a message broker.
@@ -2222,6 +2288,10 @@ def retrieve(queues: namedtuple, traces: Any, args: object): # noqa: C901
22222288
getjob_failures += 1
22232289
if getjob_failures >= get_nr_getjob_failures(args.getjob_failures, args.harvester_submitmode):
22242290
logger.warning(f'did not get a job -- max number of job request failures reached: {getjob_failures} (setting graceful_stop)')
2291+
if getjob_failures >= 5 and pilot_cache.queuedata.resource_type.lower() == 'hpc':
2292+
logger.warning('setting error code to NOJOBSINPANDA on HPC resource')
2293+
traces.pilot['error_code'] = errors.NOJOBSINPANDA
2294+
22252295
args.graceful_stop.set()
22262296
break
22272297

@@ -2239,6 +2309,10 @@ def retrieve(queues: namedtuple, traces: Any, args: object): # noqa: C901
22392309
getjob_failures += 1
22402310
if getjob_failures >= get_nr_getjob_failures(args.getjob_failures, args.harvester_submitmode):
22412311
logger.warning(f'did not get a job -- max number of job request failures reached: {getjob_failures}')
2312+
if getjob_failures >= 5 and pilot_cache.queuedata.resource_type.lower() == 'hpc':
2313+
logger.warning('setting error code to NOJOBSINPANDA on HPC resource')
2314+
traces.pilot['error_code'] = errors.NOJOBSINPANDA
2315+
22422316
args.graceful_stop.set()
22432317
break
22442318

pilot/control/payload.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -273,10 +273,14 @@ def execute_payloads(queues: namedtuple, traces: Any, args: object): # noqa: C9
273273
if exit_code and exit_code > 1000: # pilot error code, add to list
274274
logger.warning(f'pilot error code received (code={exit_code}, diagnostics=\n{diagnostics})')
275275
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exit_code, msg=diagnostics)
276+
else:
277+
if exit_code >= 0:
278+
job.transexitcode = exit_code % 255
279+
else:
280+
logger.warning(f'pilot error code received a negative transform exit code={exit_code} - will not set transexitcode')
276281

277282
logger.debug(f'run() returned exit_code={exit_code}')
278283
set_cpu_consumption_time(job)
279-
job.transexitcode = exit_code % 255
280284
if out:
281285
out.close()
282286
if err:
@@ -321,6 +325,11 @@ def execute_payloads(queues: namedtuple, traces: Any, args: object): # noqa: C9
321325
else:
322326
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.INTERNALPILOTPROBLEM, msg=error)
323327

328+
if exit_code < 0 and not job.piloterrorcodes:
329+
# exit code < 0 means that the payload was killed, e.g. by a signal
330+
logger.warning("it seems the payload was killed but no error code was assigned yet - setting SIGTERM error")
331+
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.SIGTERM)
332+
324333
if job.piloterrorcodes:
325334
exit_code_interpret = 1
326335

@@ -667,7 +676,7 @@ def set_cpu_consumption_time(job: JobData):
667676
logger.info(f'CPU consumption time: {cpuconsumptiontime} {job.cpuconsumptionunit} (rounded to {job.cpuconsumptiontime} {job.cpuconsumptionunit})')
668677

669678

670-
def perform_initial_payload_error_analysis(job: JobData, exit_code: int):
679+
def perform_initial_payload_error_analysis(job: JobData, exit_code: int): # noqa: C901
671680
"""
672681
Perform an initial analysis of the payload.
673682
@@ -678,6 +687,8 @@ def perform_initial_payload_error_analysis(job: JobData, exit_code: int):
678687
"""
679688
if exit_code != 0:
680689
logger.warning(f'main payload execution returned non-zero exit code: {exit_code}')
690+
if exit_code < 0:
691+
logger.warning("payload was killed (negative exit code)")
681692

682693
# check if the transform has produced an error report
683694
path = os.path.join(job.workdir, config.Payload.error_report)
@@ -700,7 +711,13 @@ def perform_initial_payload_error_analysis(job: JobData, exit_code: int):
700711
path = os.path.join(job.workdir, config.Payload.payloadstderr)
701712
if os.path.exists(path):
702713
stderr = read_file(path)
703-
exit_code = errors.resolve_transform_error(exit_code, stderr)
714+
_exit_code, error_message = errors.resolve_transform_error(exit_code, stderr)
715+
if error_message:
716+
logger.warning(f"found apptainer error in stderr: {error_message}")
717+
if exit_code == 0 and _exit_code != 0:
718+
logger.warning("will overwrite trf exit code 0 due to previous error")
719+
exit_code = _exit_code
720+
704721
else:
705722
stderr = ''
706723
logger.info(f'file does not exist: {path}')

pilot/control/payloads/generic.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -860,7 +860,13 @@ def run(self) -> tuple[int, str]: # noqa: C901
860860
else "General payload setup verification error (check setup logs)"
861861
)
862862
# check for special errors in the output
863-
exit_code = errors.resolve_transform_error(exit_code, diagnostics)
863+
_exit_code, error_message = errors.resolve_transform_error(exit_code, diagnostics)
864+
if error_message:
865+
logger.warning(f"found apptainer error in stderr: {error_message}")
866+
if exit_code == 0 and _exit_code != 0:
867+
logger.warning("will overwrite trf exit code 0 due to previous error")
868+
exit_code = _exit_code
869+
864870
diagnostics = errors.format_diagnostics(exit_code, diagnostics)
865871
return exit_code, diagnostics
866872
if out:

pilot/info/queuedata.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ class QueueData(BaseData):
6363
platform = "" # cmtconfig value
6464
container_options = "" # singularity only options? to be reviewed and forced to be a dict (support options for other containers?)
6565
container_type = {} # dict of container names by user as a key
66+
resource_type = "" # type of resource, e.g. 'grid', 'hpc'
6667
copytools = None
6768
acopytools = None
6869

@@ -104,7 +105,7 @@ class QueueData(BaseData):
104105
_keys = {int: ['timefloor', 'maxwdir', 'pledgedcpu', 'es_stageout_gap',
105106
'corecount', 'maxrss', 'maxtime', 'maxinputsize', 'memkillgrace'],
106107
str: ['name', 'type', 'appdir', 'catchall', 'platform', 'container_options', 'container_type',
107-
'resource', 'state', 'status', 'site', 'environ'],
108+
'resource', 'state', 'status', 'site', 'environ', 'resource_type'],
108109
dict: ['copytools', 'acopytools', 'astorages', 'aprotocols', 'acopytools_schemas', 'params'],
109110
bool: ['allow_lan', 'allow_wan', 'direct_access_lan', 'direct_access_wan', 'is_cvmfs', 'use_pcache']
110111
}

0 commit comments

Comments
 (0)