Skip to content

Commit c2f23d6

Browse files
authored
Merge pull request #164 from PanDAWMS/next
3.10.1.14
2 parents 03437bc + a9d0911 commit c2f23d6

20 files changed

Lines changed: 284 additions & 178 deletions

PILOTVERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
3.10.0.24
1+
3.10.1.14

doc/conf.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818
#
1919
# Authors:
2020
# - Daniel Drizhuk, d.drizhuk@gmail.com, 2017
21-
# - Paul Nilsson, paul.nilsson@cern.ch, 2023
21+
# - Paul Nilsson, paul.nilsson@cern.ch, 2023-25
2222
#
2323
# -*- coding: utf-8 -*-
2424
#
25-
# Pilot 2 documentation build configuration file, created by
25+
# Pilot 3 documentation build configuration file, created by
2626
# sphinx-quickstart on Thu Apr 13 16:16:52 2017.
2727
#
2828
# This file is execfile()d with the current directory set to its
@@ -74,18 +74,18 @@
7474
master_doc = 'index'
7575

7676
# General information about the project.
77-
project = u'Pilot 2'
78-
copyright = u'2017, Paul Nilsson, Mario Lassnig, Daniil Drizhuk, ...'
79-
author = u'Paul Nilsson, Mario Lassnig, Daniil Drizhuk, ...'
77+
project = 'Pilot 3'
78+
# copyright = ''
79+
author = 'Paul Nilsson'
8080

8181
# The version info for the project you're documenting, acts as replacement for
8282
# |version| and |release|, also used in various other places throughout the
8383
# built documents.
8484
#
8585
# The short X.Y version.
86-
version = u''
86+
version = ''
8787
# The full version, including alpha/beta/rc tags.
88-
release = u''
88+
release = ''
8989

9090
# The language for content autogenerated by Sphinx. Refer to documentation
9191
# for a list of supported languages.
@@ -164,8 +164,8 @@
164164
# (source start file, target name, title,
165165
# author, documentclass [howto, manual, or own class]).
166166
latex_documents = [
167-
(master_doc, 'Pilot2.tex', u'Pilot 2 Documentation',
168-
u'Paul Nilsson, Mario Lassnig, Daniil Drizhuk, ...', 'manual'),
167+
(master_doc, 'Pilot3.tex', 'Pilot 3 Documentation',
168+
'Paul Nilsson, Mario Lassnig, Daniil Drizhuk, ...', 'manual'),
169169
]
170170

171171

@@ -174,7 +174,7 @@
174174
# One entry per manual page. List of tuples
175175
# (source start file, name, description, authors, manual section).
176176
man_pages = [
177-
(master_doc, 'pilot2', u'Pilot 2 Documentation',
177+
(master_doc, 'pilot3', 'Pilot 3 Documentation',
178178
[author], 1)
179179
]
180180

@@ -185,8 +185,8 @@
185185
# (source start file, target name, title, author,
186186
# dir menu entry, description, category)
187187
texinfo_documents = [
188-
(master_doc, 'Pilot2', u'Pilot 2 Documentation',
189-
author, 'Pilot2', 'One line description of project.',
188+
(master_doc, 'Pilot3', 'Pilot 3 Documentation',
189+
author, 'Pilot3', 'One line description of project.',
190190
'Miscellaneous'),
191191
]
192192

pilot.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
PILOT_MULTIJOB_START_TIME,
5353
PILOT_START_TIME,
5454
SERVER_UPDATE_NOT_DONE,
55-
SUCCESS,
5655
)
5756
from pilot.util.cvmfs import (
5857
cvmfs_diagnostics,
@@ -836,7 +835,6 @@ def get_proper_exit_code() -> (int, int):
836835
logging.getLogger(__name__).info(
837836
f"pilot has finished ({trace.pilot['nr_jobs']} jobs were processed)"
838837
)
839-
exitcode = SUCCESS
840838
elif trace.pilot["state"] == FAILURE:
841839
logging.critical("pilot workflow failure -- aborting")
842840
elif trace.pilot["state"] == ERRNO_NOJOBS:

pilot/api/analytics.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ def __init__(self, **kwargs):
314314
if len(self._x) != len(self._y):
315315
raise NotSameLength("input data (lists) have different lengths")
316316

317-
logger.info(f'model: {self._model}, x: {self._x}, y: {self._y}')
317+
logger.debug(f'model: {self._model}, x: {self._x}, y: {self._y}')
318318
# base calculations
319319
if self._model == "linear":
320320
self._ss = sum_square_dev(self._x)

pilot/common/errorcodes.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717
# under the License.
1818
#
1919
# Authors:
20-
# - Paul Nilsson, paul.nilsson@cern.ch, 2017-2024
20+
# - Paul Nilsson, paul.nilsson@cern.ch, 2017-25
2121
# - Wen Guan, wen.guan@cern.ch, 2018
2222

2323
"""Error codes set by the pilot."""
2424

2525
import re
26+
from json import dump
2627
from typing import Any
2728

2829

@@ -590,6 +591,29 @@ def format_diagnostics(self, code: int, diag: str) -> str:
590591

591592
return error_message
592593

594+
@classmethod
595+
def get_error_name(cls, code: int) -> str:
596+
"""
597+
Returns the name of the error constant given its value.
598+
Assumes that error constants are defined as uppercase integers in the class.
599+
"""
600+
for name, value in cls.__dict__.items():
601+
if isinstance(value, int) and value == code and name.isupper():
602+
return name
603+
604+
return str(code) # fallback if not found
605+
606+
@classmethod
607+
def generate_json(cls, filename: str = "error_codes.json"):
608+
"""Generate a JSON object containing the error codes and diagnostics."""
609+
error_dict = {}
610+
for error_code, message in cls._error_messages.items():
611+
error_name = cls.get_error_name(error_code)
612+
error_dict[error_code] = [error_name, message]
613+
614+
with open(filename, "w", encoding='utf-8') as f:
615+
dump(error_dict, f, indent=4)
616+
593617
@classmethod
594618
def is_recoverable(cls, code: int = 0) -> bool:
595619
"""

pilot/control/job.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
check_for_final_server_update,
5555
encode_globaljobid,
5656
get_batchsystem_jobid,
57-
get_display_info,
57+
# get_display_info,
5858
get_job_scheduler_id,
5959
get_pilot_state,
6060
has_instruction_sets,
@@ -456,7 +456,7 @@ def send_state(job: Any, args: Any, state: str, xml: str = "", metadata: str = "
456456
# does the server update contain any backchannel information? if so, update the job object
457457
handle_backchannel_command(res, job, args, test_tobekilled=test_tobekilled)
458458

459-
if final and os.path.exists(job.workdir): # ignore if workdir doesn't exist - might be a delayed jobUpdate
459+
if final: # and os.path.exists(job.workdir): # ignore if workdir doesn't exist - might be a delayed jobUpdate
460460
os.environ['SERVER_UPDATE'] = SERVER_UPDATE_FINAL
461461

462462
if final and state in {'finished', 'holding', 'failed'}:
@@ -763,14 +763,15 @@ def get_data_structure(job: Any, state: str, args: Any, xml: str = "", metadata:
763763

764764
# CPU instruction set
765765
instruction_sets = has_instruction_sets(['AVX2'])
766-
product, vendor = get_display_info()
766+
# if the product and vendor info is needed, better to cache it since it is expensive to get
767+
# product, vendor = get_display_info()
767768
if instruction_sets:
768769
if 'cpuConsumptionUnit' in data:
769770
data['cpuConsumptionUnit'] += '+' + instruction_sets
770771
else:
771772
data['cpuConsumptionUnit'] = instruction_sets
772-
if product and vendor:
773-
logger.debug(f'cpuConsumptionUnit: could have added: product={product}, vendor={vendor}')
773+
#if product and vendor:
774+
# logger.debug(f'cpuConsumptionUnit: could have added: product={product}, vendor={vendor}')
774775

775776
# CPU architecture
776777
cpu_arch = get_cpu_arch()
@@ -2187,7 +2188,7 @@ def retrieve(queues: namedtuple, traces: Any, args: object): # noqa: C901
21872188
logger.info(f'job {job.jobid} has start time={job.starttime}')
21882189

21892190
# inform the server if this job should be in debug mode (real-time logging), decided by queuedata
2190-
if "loggingfile" in job.infosys.queuedata.catchall:
2191+
if "setdebugmode" in job.infosys.queuedata.catchall:
21912192
set_debug_mode(job.jobid, args.url, args.port)
21922193

21932194
# logger.info('resetting any existing errors')
@@ -3024,11 +3025,17 @@ def job_monitor(queues: namedtuple, traces: Any, args: object): # noqa: C901
30243025
error_code = errors.PANDAKILL
30253026
elif os.environ.get('REACHED_MAXTIME', None):
30263027
# the batch system max time has been reached, time to abort (in the next step)
3027-
logger.info('REACHED_MAXTIME seen by job monitor - abort everything')
3028+
logger.info('REACHED_MAXTIME seen by job monitor - sleeping up to 30 s before aborting job')
3029+
counter = 0
3030+
while os.environ['SERVER_UPDATE'] != SERVER_UPDATE_FINAL and counter < 30:
3031+
time.sleep(1)
3032+
counter += 1
3033+
30283034
if not args.graceful_stop.is_set():
30293035
logger.info('setting graceful_stop since it was not set already')
30303036
args.graceful_stop.set()
30313037
error_code = errors.REACHEDMAXTIME
3038+
30323039
if error_code:
30333040
jobs[i].state = 'failed'
30343041
jobs[i].piloterrorcodes, jobs[i].piloterrordiags = errors.add_error_code(error_code)
@@ -3099,8 +3106,9 @@ def job_monitor(queues: namedtuple, traces: Any, args: object): # noqa: C901
30993106
break
31003107
else:
31013108
# note: when sending a state change to the server, the server might respond with 'tobekilled'
3102-
if _job.state == 'failed':
3103-
logger.warning('job state is \'failed\' - order log transfer and abort job_monitor() (2)')
3109+
# only if combined with tobekilled, in which case errors.PANDAKILL is set
3110+
if _job.state == 'failed' and errors.PANDAKILL in _job.piloterrorcodes:
3111+
logger.warning('job state is \'failed\' and errors.PANDAKILL is set - order log transfer and abort job_monitor() (2)')
31043112
_job.stageout = 'log' # only stage-out log file
31053113
put_in_queue(_job, queues.data_out)
31063114
#abort = True

pilot/control/monitor.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,8 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901
159159
break
160160

161161
if n_iterations % 60 == 0:
162-
logger.info(f'{time_since_start}s have passed since pilot start')
162+
logger.info(f"{time_since_start}s have passed since pilot start - server update state is \'{environ['SERVER_UPDATE']}\'")
163+
logger.debug(f"args.update_server={args.update_server}")
163164

164165
# every minute run the following check
165166
if is_pilot_check(check='machinefeatures'):
@@ -170,6 +171,9 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901
170171
break
171172
last_minute_check = time.time()
172173

174+
# test max
175+
#time.sleep(120)
176+
#reached_maxtime_abort(args)
173177
# take a nap
174178
time.sleep(1)
175179

pilot/control/payload.py

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ def get_logging_info(job: JobData, args: object) -> dict:
446446
logger.info("correct logserver formal: logging_type;protocol://hostname:port")
447447
return {}
448448

449-
regex = r"logserver='(?P<logging_type>[^;]+);(?P<protocol>[^:]+)://(?P<hostname>[^:]+):(?P<port>\d+)'"
449+
regex = r"logserver=(?P<logging_type>[^;]+);(?P<protocol>[^:]+)://(?P<hostname>[^:]+):(?P<port>\d+)"
450450
match = search(regex, logserver)
451451
if match:
452452
logging_type = match.group('logging_type')
@@ -489,6 +489,23 @@ def get_logging_info(job: JobData, args: object) -> dict:
489489
return info_dic
490490

491491

492+
def get_catchall_loggingfile(catchall: str) -> str:
493+
"""
494+
Extract the logging file from the catchall field if present.
495+
496+
:param catchall: catchall field from queuedata (str)
497+
:return: logging file name (str).
498+
"""
499+
filename = ""
500+
if catchall and "loggingfile" in catchall:
501+
_filename = findall(r'loggingfile=([^,]+)', catchall)
502+
if _filename:
503+
filename = _filename[0]
504+
logger.debug(f'found filename in catchall: {filename}')
505+
506+
return filename
507+
508+
492509
def find_log_to_tail(debug_command: str, workdir: str, args: object, is_analysis: bool, catchall: str) -> str:
493510
"""
494511
Find the log file to tail in the RT logging.
@@ -505,10 +522,16 @@ def find_log_to_tail(debug_command: str, workdir: str, args: object, is_analysis
505522
counter = 0
506523
maxwait = 5 * 60
507524

525+
# get filename from env or from catchall if present
526+
filename_env = os.environ.get('REALTIME_LOGFILE', None)
527+
filename_catchall = get_catchall_loggingfile(catchall) if not filename_env else None
528+
529+
# .. otherwise get it from the debug command or use default for analysis jobs
508530
if 'tail' in debug_command:
509531
filename = debug_command.split(' ')[-1]
510-
elif is_analysis:
532+
elif is_analysis and not filename_env and not filename_catchall:
511533
filename = 'tmp.stdout*'
534+
512535
if filename:
513536
logger.debug(f'filename={filename}')
514537
while counter < maxwait and not args.graceful_stop.is_set():
@@ -520,12 +543,13 @@ def find_log_to_tail(debug_command: str, workdir: str, args: object, is_analysis
520543
break
521544
counter += 10
522545

523-
if not path and "loggingfile" in catchall:
546+
if not path and filename_env:
547+
# extract the path from the env variable
548+
path = filename_env
549+
550+
if not path and filename_catchall:
524551
# extract the path from the catchall "..,loggingfile=path,.."
525-
_path = findall(r'loggingfile=([^,]+)', catchall)
526-
if _path:
527-
path = _path[0]
528-
logger.debug(f'found path in catchall: {path}')
552+
path = filename_catchall
529553

530554
# fallback to known log file if no other file could be found
531555
logf = path if path else config.Payload.payloadstdout
@@ -702,7 +726,7 @@ def perform_initial_payload_error_analysis(job: JobData, exit_code: int):
702726

703727
if exit_code < 1000:
704728
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.PAYLOADEXECUTIONFAILURE,
705-
msg=error_diag)
729+
msg=msg)
706730
else:
707731
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exit_code, msg=msg)
708732
else:

pilot/test/test_timeout.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#!/usr/bin/env python
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
#
19+
# Authors:
20+
# - Paul Nilsson, paul.nilsson@cern.ch, 2025
21+
22+
"""Unit test functions for time-outs."""
23+
24+
import unittest
25+
from time import sleep
26+
from pilot.util.auxiliary import TimeoutException
27+
from pilot.util.timer import (
28+
timeout,
29+
TimedThread
30+
)
31+
32+
33+
def spend_time(t):
34+
"""Function that simulates work by sleeping."""
35+
sleep(t)
36+
37+
38+
class TestTimeoutFunction(unittest.TestCase):
39+
def test_function_times_out(self):
40+
"""Test that the function times out correctly."""
41+
ctimeout = 1 # Timeout duration
42+
with self.assertRaises(TimeoutException):
43+
timeout(ctimeout, timer=TimedThread)(spend_time)(2) # Exceeds timeout
44+
45+
def test_function_completes_within_time(self):
46+
"""Test that the function completes if within timeout limit."""
47+
ctimeout = 3 # Longer timeout
48+
try:
49+
timeout(ctimeout, timer=TimedThread)(spend_time)(1) # Should not time out
50+
except TimeoutException:
51+
self.fail("TimeoutException was raised unexpectedly.")
52+
53+
54+
if __name__ == "__main__":
55+
unittest.main()

0 commit comments

Comments
 (0)