Skip to content

Commit 8d7d1f9

Browse files
authored
Merge pull request #186 from PanDAWMS/next
3.11.2.19
2 parents 32d96ac + 3c4b192 commit 8d7d1f9

14 files changed

Lines changed: 366 additions & 155 deletions

File tree

PILOTVERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
3.11.1.15
1+
3.11.2.19

pilot.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ def main() -> int: # noqa: C901
176176
# create and report the worker node map
177177
if args.update_server and args.pilot_user.lower() == "atlas": # only send info for atlas for now
178178
try:
179-
send_workernode_map(infosys.queuedata.site, args.url, args.port, "IPv6", logger) # note: assuming IPv6, fallback in place
179+
send_workernode_map(infosys.queuedata.site, infosys.queuedata.name, args.url, args.port, "IPv6", logger) # note: assuming IPv6, fallback in place
180180
except Exception as error:
181181
logger.warning(f"exception caught when sending workernode map: {error}")
182182
try:
@@ -500,8 +500,19 @@ def send_worker_status(
500500
data["site"] = queue
501501
data["node_id"] = get_node_name()
502502

503+
data_new = {}
504+
data_new["worker_id"] = os.environ.get("HARVESTER_WORKER_ID", None)
505+
data_new["harvester_id"] = os.environ.get("HARVESTER_ID", None)
506+
data_new["status"] = status
507+
# data_new["site"] = queue
508+
data_new["node_id"] = get_node_name()
509+
503510
# attempt to send the worker info to the server
511+
# if data_new["worker_id"] and data_new["harvester_id"]:
504512
if data["workerID"] and data["harvesterID"]:
513+
# send_update(
514+
# "update_worker_status", data_new, url, port, ipv=internet_protocol_version, max_attempts=2
515+
# )
505516
send_update(
506517
"updateWorkerPilotStatus", data, url, port, ipv=internet_protocol_version, max_attempts=2
507518
)
@@ -511,6 +522,7 @@ def send_worker_status(
511522

512523
def send_workernode_map(
513524
site: str,
525+
queue: str,
514526
url: str,
515527
port: int,
516528
internet_protocol_version: str,
@@ -521,14 +533,15 @@ def send_workernode_map(
521533
522534
Args:
523535
site (str): ATLAS site name.
536+
queue (str): PanDA queue name.
524537
url (str): Server URL.
525538
port (int): Server port.
526539
internet_protocol_version (str): Internet protocol version, IPv4 or IPv6.
527540
logger (Any): Logging object.
528541
"""
529542
# worker node structure to be sent to the server
530543
try:
531-
data = get_workernode_map(site)
544+
data = get_workernode_map(site, queue)
532545
except Exception as e:
533546
logger.warning(f"exception caught when calling get_workernode_map(): {e}")
534547
try:

pilot/control/job.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
get_batchsystem_jobid,
5757
# get_display_info,
5858
get_job_scheduler_id,
59+
get_pilot_id,
5960
get_pilot_state,
6061
has_instruction_sets,
6162
is_virtual_machine,
@@ -1110,7 +1111,8 @@ def remove_pilot_logs_from_list(list_of_files: list, jobid: str) -> list:
11101111
config.Container.container_script, config.Container.release_setup,
11111112
config.Container.stagein_status_dictionary, config.Container.stagein_replica_dictionary,
11121113
'eventLoopHeartBeat.txt', 'memory_monitor_output.txt', 'memory_monitor_summary.json_snapshot',
1113-
f'curl_updateJob_{jobid}.config', config.Pilot.pilot_heartbeat_file]
1114+
f'curl_updateJob_{jobid}.config', config.Pilot.pilot_heartbeat_file,
1115+
'./panda_token', 'panda_token']
11141116
except Exception as error:
11151117
logger.warning(f'exception caught: {error}')
11161118
to_be_removed = []
@@ -2369,7 +2371,9 @@ def retrieve(queues: namedtuple, traces: Any, args: object): # noqa: C901
23692371
# (only proceed if there is a condor class ad)
23702372
if os.environ.get('_CONDOR_JOB_AD', None):
23712373
htcondor_envvar(job.jobid)
2372-
update_condor_classad(pandaid=job.jobid, state='retrieved')
2374+
# update_condor_classad(pandaid=job.jobid, state='retrieved')
2375+
pilotid = get_pilot_id(args.version_tag)
2376+
update_condor_classad(pandaid=job.jobid, pilotid=pilotid)
23732377

23742378
# add the job definition to the jobs queue and increase the job counter,
23752379
# and wait until the job has finished

pilot/info/dataloader.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ def load_url_data(cls, url: str, fname: str = None, cache_time: int = 0, nretry:
9393
:param sleep_time: sleep time (default is 60 s) between retry attempts (int)
9494
:return: data loaded from the url or file content if url passed is a filename (Any).
9595
"""
96+
logger.debug(f'xxx loading data url={url}')
97+
9698
@timeout(seconds=20)
9799
def _readfile(url: str) -> str:
98100
"""

pilot/info/extinfo.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,9 @@ def jsonparser_panda(dat: Any) -> dict:
164164

165165
return {pandaqueue: _dat}
166166

167-
queuedata_url = (os.environ.get('QUEUEDATA_SERVER_URL') or getattr(config.Information, 'queuedata_url', '')).format(**{'pandaqueue': pandaqueues[0]})
167+
_url = os.environ.get('QUEUEDATA_SERVER_URL')
168+
queuedata_url = (_url or getattr(config.Information, 'queuedata_url', '')).format(**{'pandaqueue': pandaqueues[0]})
169+
_inf = getattr(config.Information, 'queuedata_url', '')
168170
cric_url = getattr(config.Information, 'queues_url', None)
169171
cric_url = cric_url.format(pandaqueue=pandaqueues[0] if len(pandaqueues) == 1 else 'pandaqueues')
170172
cvmfs_path = cls.get_cvmfs_path(getattr(config.Information, 'queuedata_cvmfs', None), 'cric_pandaqueues.json')
@@ -192,6 +194,7 @@ def jsonparser_panda(dat: Any) -> dict:
192194
}
193195
}
194196

197+
logger.debug(f'xxx sources={sources}')
195198
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
196199
user = __import__(f'pilot.user.{pilot_user}.setup', globals(), locals(), [pilot_user], 0)
197200
queuedata_source_priority = user.get_queuedata_priority()

pilot/info/jobdata.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ class JobData(BaseData):
8989
pandasecrets = "" # User defined secrets
9090
pilotsecrets = {} # Real-time logging secrets
9191
requestid = None # Request ID
92+
resourcetype = None # resource type (SCORE, MCORE, etc)
9293

9394
# set by the pilot (not from job definition)
9495
workdir = "" # working directory for this job
@@ -199,7 +200,7 @@ class JobData(BaseData):
199200
'swrelease', 'zipmap', 'imagename', 'imagename_jobdef', 'accessmode', 'transfertype',
200201
'datasetin', ## TO BE DEPRECATED: moved to FileSpec (job.indata)
201202
'infilesguids', 'memorymonitor', 'allownooutput', 'pandasecrets', 'prodproxy', 'alrbuserplatform',
202-
'debug_command', 'dask_scheduler_ip', 'jupyter_session_ip', 'altstageout', 'nucleus'],
203+
'debug_command', 'dask_scheduler_ip', 'jupyter_session_ip', 'altstageout', 'nucleus', 'resourcetype'],
203204
list: ['piloterrorcodes', 'piloterrordiags', 'workdirsizes', 'zombies', 'corecounts', 'subprocesses',
204205
'logdata', 'outdata', 'indata', 'cpufrequencies'],
205206
dict: ['status', 'fileinfo', 'metadata', 'utilities', 'overwrite_queuedata', 'sizes', 'preprocess',
@@ -553,7 +554,8 @@ def load(self, data: dict, use_kmap: bool = True):
553554
'dask_scheduler_ip': 'scheduler_ip',
554555
'jupyter_session_ip': 'session_ip',
555556
'minramcount': 'minRamCount',
556-
'altstageout': 'altStageOut'
557+
'altstageout': 'altStageOut',
558+
'resourcetype': 'resource_type'
557559
} if use_kmap else {}
558560

559561
self._load_data(data, kmap)

pilot/info/queuedata.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ class QueueData(BaseData):
100100
altstageout = None # allow altstageout: force (True) or disable (False) or no preferences (None)
101101
pilot_walltime_grace = 1.0 # pilot walltime grace factor
102102
pilot_rss_grace = 2.0 # pilot rss grace factor
103+
pilot_maxwdir_grace = 1.0 # pilot maxwdir grace factor
103104

104105
# specify the type of attributes for proper data validation and casting
105106
_keys = {int: ['timefloor', 'maxwdir', 'pledgedcpu', 'es_stageout_gap',
@@ -189,7 +190,8 @@ def allow_altstageout(self):
189190
def set_pilot_walltime_grace(self):
190191
"""Set pilot walltime grace factor based on the queuedata settings."""
191192
try:
192-
_pilot_walltime_grace = float(self.params.get('pilot_walltime_grace', 0))
193+
# using a 1% grace by default, which corresponds to 14 minutes for a 24-hour limit
194+
_pilot_walltime_grace = float(self.params.get('pilot_walltime_grace', 1))
193195
self.pilot_walltime_grace = 1.0 + _pilot_walltime_grace / 100.0
194196
except (ValueError, TypeError) as e:
195197
logger.warning(f"failed to set pilot_walltime_grace: {e}")
@@ -204,6 +206,15 @@ def set_pilot_rss_grace(self):
204206
logger.warning(f"failed to set pilot_rss_grace: {e}")
205207
self.pilot_rss_grace = 2.0
206208

209+
def set_pilot_maxwdir_grace(self):
210+
"""Set pilot maxwdir grace factor based on the queuedata settings."""
211+
try:
212+
_pilot_maxwdir_grace = float(self.params.get('pilot_maxwdir_grace', 20))
213+
self.pilot_maxwdir_grace = 1.0 + _pilot_maxwdir_grace / 100.0
214+
except (ValueError, TypeError) as e:
215+
logger.warning(f"failed to set pilot_maxwdir_grace: {e}")
216+
self.pilot_maxwdir_grace = 1.0
217+
207218
def clean(self):
208219
"""Validate and finally clean up required data values (required object properties) if needed."""
209220
# validate es_stageout_gap value
@@ -233,9 +244,10 @@ def clean(self):
233244
# set altstageout settings
234245
self.altstageout = self.allow_altstageout()
235246

236-
# set pilot walltime and rss grace factors
247+
# set pilot grace factors
237248
self.set_pilot_walltime_grace()
238249
self.set_pilot_rss_grace()
250+
self.set_pilot_maxwdir_grace()
239251

240252
## custom function pattern to apply extra validation to the key values
241253
##def clean__keyname(self, raw, value):

pilot/scripts/open_remote_file.py

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ def get_file_lists(turls_string: str) -> dict:
121121

122122

123123
# pylint: disable=useless-param-doc
124-
def try_open_file(turl_str: str, _queues: namedtuple):
124+
def try_open_file_old(turl_str: str, _queues: namedtuple):
125125
"""
126126
Attempt to open a remote file.
127127
@@ -153,6 +153,60 @@ def try_open_file(turl_str: str, _queues: namedtuple):
153153
_queues.result.put(turl_str)
154154

155155

156+
# pylint: disable=useless-param-doc
157+
def try_open_file(turl_str: str, _queues: namedtuple):
158+
"""
159+
Attempt to open a remote file.
160+
161+
Successfully opened turls will be put in the queues.opened queue.
162+
Unsuccessful turls will be placed in the queues.unopened queue.
163+
164+
:param turl_str: turl (str)
165+
:param _queues: Namedtuple with 'opened', 'unopened', 'result' queues.
166+
"""
167+
168+
def attempt_open(path: str) -> bool:
169+
"""Return True if ROOT successfully opens the file."""
170+
try:
171+
message(f'opening {path}')
172+
_ = ROOT.TFile.SetOpenTimeout(
173+
30 * 1000) # 30 seconds
174+
in_file = ROOT.TFile.Open(path)
175+
except Exception as exc:
176+
message(f'caught exception: {exc}')
177+
return False
178+
179+
if in_file and in_file.IsOpen():
180+
in_file.Close()
181+
message(f'closed {path}')
182+
return True
183+
184+
return False
185+
186+
# --- First attempt (original TURL) ---
187+
opened = attempt_open(turl_str)
188+
189+
# --- Retry logic for davs failures ---
190+
if not opened:
191+
# We only retry if the failure looks like the DAVS issue
192+
retry_path = turl_str + "?filetype=raw"
193+
message("Retrying with ?filetype=raw appended")
194+
195+
opened = attempt_open(retry_path)
196+
197+
# If the retry succeeds, report success with the modified TURL
198+
if opened:
199+
turl_str = retry_path
200+
201+
# --- Queue results ---
202+
if opened:
203+
_queues.opened.put(turl_str)
204+
else:
205+
_queues.unopened.put(turl_str)
206+
207+
_queues.result.put(turl_str)
208+
209+
156210
# pylint: disable=useless-param-doc
157211
def spawn_file_open_thread(_queues: Any, file_list: list) -> threading.Thread:
158212
"""

pilot/util/auxiliary.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
from pilot.common.errorcodes import ErrorCodes
4646
from pilot.util.condor import (
4747
get_globaljobid,
48-
update_condor_classad
48+
# update_condor_classad
4949
)
5050
from pilot.util.container import execute
5151
from pilot.util.filehandling import (
@@ -74,6 +74,23 @@ def pilot_version_banner() -> None:
7474
logger.info('*' * len(version))
7575

7676

77+
def get_pilot_id(version_tag: str) -> str:
78+
"""
79+
Return a unique pilot id.
80+
81+
Used by CondorHT ClassAd.
82+
83+
Args:
84+
version_tag: pilot version tag (string).
85+
86+
Returns:
87+
pilot id (string).
88+
"""
89+
unique_id = os.environ.get("GTAG", "unknown")
90+
pilotversion = os.environ.get('PILOT_VERSION')
91+
return f'{pilotversion}-{version_tag}-{unique_id}'
92+
93+
7794
def is_virtual_machine() -> bool:
7895
"""
7996
Determine if we are running in a virtual machine.
@@ -346,7 +363,7 @@ def set_pilot_state(job: Any = None, state: str = '') -> None:
346363

347364
if job and job.state != 'failed':
348365
job.state = state
349-
update_condor_classad(state=state)
366+
# update_condor_classad(state=state)
350367

351368

352369
def check_for_final_server_update(update_server: bool) -> None:

0 commit comments

Comments
 (0)