Skip to content

Commit b4e1b2e

Browse files
authored
Merge pull request #183 from PalNilsson/next
3.11.1.15
2 parents 14a6628 + b82de81 commit b4e1b2e

30 files changed

Lines changed: 2792 additions & 141 deletions
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
name: Circular import check
2+
3+
on:
4+
push:
5+
branches: [master, next]
6+
paths:
7+
- "**.py"
8+
- ".github/workflows/circular-imports.yml"
9+
pull_request:
10+
branches: [master, next]
11+
paths:
12+
- "**.py"
13+
- ".github/workflows/circular-imports.yml"
14+
workflow_dispatch:
15+
16+
permissions:
17+
contents: read
18+
19+
jobs:
20+
circular-imports:
21+
name: circular-imports (py${{ matrix.python-version }})
22+
runs-on: ubuntu-latest
23+
strategy:
24+
fail-fast: false
25+
matrix:
26+
python-version: ["3.9", "3.11", "3.12"]
27+
28+
concurrency:
29+
group: circular-imports-${{ github.ref }}-py${{ matrix.python-version }}
30+
cancel-in-progress: true
31+
32+
env:
33+
# Pin or set to "" to use latest from PyPI
34+
CIRC_DET_VERSION: "==1.0.16"
35+
# If your code is in a subfolder, change this
36+
PROJECT_PATH: "."
37+
38+
steps:
39+
- name: Checkout
40+
uses: actions/checkout@v4
41+
42+
- name: Set up Python ${{ matrix.python-version }}
43+
uses: actions/setup-python@v5
44+
with:
45+
python-version: ${{ matrix.python-version }}
46+
cache: pip
47+
48+
- name: Install circular-import-detector
49+
run: |
50+
python -m pip install --upgrade pip
51+
python -m pip install "circular-import-detector${CIRC_DET_VERSION}"
52+
53+
- name: Run detector (verbose) and save report
54+
run: |
55+
set -o pipefail
56+
circular-import-detector "$PROJECT_PATH" --verbose --exit-code \
57+
| tee "circular-import-report-py${{ matrix.python-version }}.txt"
58+
59+
- name: Upload report artifact
60+
if: always()
61+
uses: actions/upload-artifact@v4
62+
with:
63+
name: circular-import-report-py${{ matrix.python-version }}
64+
path: circular-import-report-py${{ matrix.python-version }}.txt

.pre-commit-config.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,15 @@ repos:
1111
rev: 6.1.0
1212
hooks:
1313
- id: flake8
14+
- repo: local
15+
hooks:
16+
- id: circular-imports
17+
name: Check for circular imports (changed files)
18+
entry: circular-import-precommit
19+
language: python
20+
additional_dependencies:
21+
- circular-import-detector==1.0.18
22+
pass_filenames: true
23+
types_or: [python]
24+
stages: [pre-commit]
25+

PILOTVERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
3.11.0.29
1+
3.11.1.15

pilot.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,14 @@ def main() -> int: # noqa: C901
152152
f"specified queue is NOT ACTIVE: {infosys.queuedata.name} -- aborting"
153153
)
154154
return errors.PANDAQUEUENOTACTIVE
155+
156+
# make sure the queue is online
157+
if infosys.queuedata.status.lower() == "offline":
158+
logger.critical(
159+
f"specified queue is OFFLINE: {infosys.queuedata.name} -- aborting"
160+
)
161+
return errors.PANDAQUEUENOTONLINE
162+
155163
except PilotException as error:
156164
logger.fatal(error)
157165
return error.get_error_code()

pilot/api/analytics.py

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@
2222
"""Functions for performing analytics including fitting of data."""
2323

2424
import logging
25-
from typing import Any
26-
25+
from typing import Any, Union
2726
from .services import Services
2827
from pilot.common.exception import NotDefined, NotSameLength, UnknownException
2928
from pilot.util.filehandling import get_table_from_file
@@ -51,11 +50,16 @@ def fit(self, x: list, y: list, model: str = "linear") -> Any:
5150
5251
For a linear model: y(x) = slope * x + intersect
5352
54-
:param x: list of input data (list of floats or ints) (list)
55-
:param y: list of input data (list of floats or ints) (list)
56-
:param model: model name (str)
57-
:raises UnknownException: in case Fit() fails
58-
:return: fit (Any).
53+
Args:
54+
x (list[float] | list[int]): Input x data points.
55+
y (list[float] | list[int]): Input y data points.
56+
model (str): Model name. Defaults to "linear".
57+
58+
Raises:
59+
UnknownException: If constructing the Fit object fails.
60+
61+
Returns:
62+
Fit: Fit object containing the fitted parameters and diagnostics.
5963
"""
6064
try:
6165
self._fit = Fit(x=x, y=y, model=model)
@@ -64,24 +68,29 @@ def fit(self, x: list, y: list, model: str = "linear") -> Any:
6468

6569
return self._fit
6670

67-
def slope(self) -> float:
71+
def slope(self) -> Union[float, None]:
6872
"""
69-
Return the slope of a linear fit, y(x) = slope * x + intersect.
73+
Return the slope of a linear fit y(x) = slope * x + intersect.
7074
71-
:raises NotDefined: exception thrown if fit is not defined.
72-
:return: slope (float).
75+
Raises:
76+
NotDefined: If the fit has not been defined.
77+
78+
Returns:
79+
float | None: The slope value if available, otherwise None.
7380
"""
7481
if not self._fit:
7582
raise NotDefined("Fit has not been defined")
7683

7784
return self._fit.slope()
7885

79-
def intersect(self) -> float:
86+
def intersect(self) -> Union[float, None]:
8087
"""
81-
Return the intersect of a linear fit, y(x) = slope * x + intersect.
88+
Return the intersect of a linear fit y(x) = slope * x + intersect.
8289
83-
:raises NotDefined: exception thrown if fit is not defined
84-
:return: intersect (float).
90+
Raises:
91+
NotDefined: If the fit has not been defined.
92+
93+
Returns:
8594
"""
8695
if not self._fit:
8796
raise NotDefined("Fit has not been defined")
@@ -397,9 +406,9 @@ def set_intersect(self):
397406
"""
398407
Calculate and set the intersect of the linear fit.
399408
"""
400-
if self._ym and self._slope and self._xm:
409+
if self._ym is not None and self._slope is not None and self._xm is not None:
401410
self._intersect = self._ym - self._slope * self._xm
402-
logger.info("-- intersect: %s", self._intersect)
411+
logger.debug(f"{self._ym} - {self._slope} * {self._xm} = {self._intersect}")
403412
else:
404413
self._intersect = None
405414
logger.info("could not calculate intersect")

pilot/api/data.py

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,71 @@
2222
# - Tobias Wegner, tobias.wegner@cern.ch, 2017-2018
2323
# - Alexey Anisenkov, anisyonk@cern.ch, 2018-2024
2424

25-
"""API for data transfers."""
25+
"""
26+
API for data transfers.
27+
28+
This module provides a high-level API for managing data transfers (stage-in and stage-out)
29+
within the Pilot framework. It serves as an abstraction layer over various underlying transfer
30+
protocols and tools, collectively known as "copytools." The primary goal is to provide a
31+
unified interface for staging data, regardless of the specific technology used for the transfer.
32+
33+
Core Classes:
34+
- `StagingClient`: This is the base class that provides the common framework for data staging.
35+
It handles the dynamic selection of copytools based on site configuration and the type of
36+
activity (e.g., 'read_lan', 'write_wan'). It includes methods for resolving file replicas
37+
from catalogs like Rucio, sorting them based on priority (e.g., LAN vs. WAN), and
38+
orchestrating the transfer process through the `transfer` method. It also manages tracing
39+
and logging for transfers.
40+
41+
- `StageInClient`: This class inherits from `StagingClient` and specializes in handling the
42+
stage-in of input files. It contains logic to resolve the best replica for input files,
43+
considering factors like direct access modes (LAN/WAN), allowed schemas (e.g., 'root', 'https'),
44+
and site-specific storage configurations. It is also responsible for checking available
45+
disk space and verifying that input file sizes are within configured limits.
46+
47+
- `StageOutClient`: This class, also inheriting from `StagingClient`, is responsible for
48+
staging out output files. Its key functionality includes preparing destinations by resolving
49+
the correct output storage element (RSE) based on the activity. It constructs the final
50+
destination SURL (Storage URL) for the output files, calculates checksums for verification,
51+
and ensures that output files exist and have a non-zero size before initiating the transfer.
52+
53+
Key Concepts:
54+
- Copytools: The actual file transfers are delegated to specific "copytool" modules, which
55+
are located in the `pilot/copytool/` directory (e.g., `rucio`, `xrdcp`, `gfal`). The
56+
`StagingClient` dynamically imports and uses the appropriate copytool based on the
57+
`acopytools` configuration for a given activity. This design makes the system extensible
58+
to new transfer protocols.
59+
60+
- Replica Resolution: For stage-in, the client queries a catalog (like Rucio) to find all
61+
available replicas (copies) of a file. These replicas are then sorted by priority
62+
(e.g., network proximity, site preference) to select the most efficient source for the
63+
transfer.
64+
65+
- Protocol and Destination Resolution: For stage-out, the client determines the correct
66+
destination storage and the protocol to use for the transfer based on site and experiment
67+
configurations stored in `ddmconf` and `astorages`.
68+
69+
- Direct Access: The `StageInClient` supports a "direct access" or "remote I/O" mode, where
70+
files are not physically copied to the worker node but are accessed remotely by the payload.
71+
The client identifies when this mode is applicable and sets the file status accordingly,
72+
providing the payload with the correct remote TURL (Transport URL).
73+
74+
Workflow:
75+
1. A `StageInClient` or `StageOutClient` is instantiated with site-specific information.
76+
2. The `transfer` method is called with a list of `FileSpec` objects that represent the
77+
files to be transferred.
78+
3. The client determines the appropriate copytool(s) for the given activity.
79+
4. For each copytool, the client prepares the files:
80+
- Stage-in: Resolves replicas, selects the best source URL, and checks for direct
81+
access possibilities.
82+
- Stage-out: Resolves the destination URL and prepares the source file by checking for
83+
its existence and calculating its checksum.
84+
5. The client calls the `copy_in` or `copy_out` function of the selected copytool module,
85+
passing the list of files to be transferred.
86+
6. The copytool executes the transfer.
87+
7. The client updates the status of the `FileSpec` objects and handles any errors. If one
88+
copytool fails, it can try the next one in the configured list.
89+
"""
2690

2791
import os
2892
import hashlib

pilot/common/errorcodes.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ class ErrorCodes:
187187
STAGEOUTAUTHENTICATIONFAILURE = 1383
188188
QUEUENOTSETUPFORCONTAINERS = 1384
189189
NOJOBSINPANDA = 1385 # internally used code
190+
PANDAQUEUENOTONLINE = 1386
190191

191192
_error_messages = {
192193
GENERALERROR: "General pilot error, consult batch log",
@@ -335,6 +336,8 @@ class ErrorCodes:
335336
STAGEOUTAUTHENTICATIONFAILURE: "Authentication failure during stage-out",
336337
QUEUENOTSETUPFORCONTAINERS: "Queue is not set up for containers",
337338
NOJOBSINPANDA: "No jobs in PanDA",
339+
PANDAQUEUENOTONLINE: "PanDA queue is not online",
340+
338341
}
339342

340343
put_error_codes = [1135, 1136, 1137, 1141, 1152, 1181]

pilot/control/data.py

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,33 @@
2323
# - Wen Guan, wen.guan@cern.ch, 2018
2424
# - Alexey Anisenkov, anisyonk@cern.ch, 2018
2525

26-
"""Control interface to data API."""
26+
"""
27+
Control interface to data API.
28+
29+
This module manages all data transfer operations for pilot jobs, including stage-in of input files
30+
and stage-out of output files and logs. It operates as a control layer, orchestrating data transfers
31+
through a system of "copytools."
32+
33+
The core of this module is a set of threads that continuously monitor queues for jobs that require
34+
data transfers:
35+
- `copytool_in`: This thread handles the stage-in of input files for jobs. It retrieves jobs from
36+
the `data_in` queue, determines the appropriate copytool to use, and initiates the transfer.
37+
- `copytool_out`: This thread manages the stage-out of output files. Once a job has finished
38+
execution, it is placed in the `data_out` queue, and this thread handles the upload of
39+
output files to the appropriate storage elements.
40+
- `queue_monitoring`: This thread monitors the status of the data transfer queues and handles
41+
jobs that have either completed or failed the transfer process.
42+
43+
The actual data transfer logic is abstracted away into "copytools," which are individual modules
44+
located in the `pilot/copytool/` directory. Each copytool is responsible for a specific transfer
45+
protocol or technology, such as Rucio, xrdcp, etc. This module interacts with the copytools
46+
through the `StageInClient` and `StageOutClient` classes from the `pilot.api.data` module, which
47+
provide a unified interface for initiating transfers regardless of the underlying copytool used.
48+
49+
The selection of which copytool to use is determined by the site configuration and the protocols
50+
supported by the storage elements involved in the transfer. This allows for a flexible and
51+
extensible data transfer framework that can be adapted to different computing environments.
52+
"""
2753

2854
import logging
2955
import os
@@ -890,6 +916,39 @@ def _do_stageout(job: JobData, args: object, xdata: list, activity: list, title:
890916
"""
891917
Use the `StageOutClient` in the Data API to perform stage-out.
892918
919+
This function orchestrates the stage-out process, including a fallback mechanism known as
920+
"alternative stage-out." This allows the pilot to attempt transferring output files to a
921+
secondary storage element (RSE) if the primary one fails.
922+
923+
The alternative stage-out logic is as follows:
924+
1. **Initial Transfer Attempt**: The function first calls `client.transfer()` to attempt
925+
uploading all files to their primary destination RSEs. Crucially, if alternative
926+
stage-out is enabled for the job (`job.allow_altstageout()` is True), the `transfer`
927+
method is called with `raise_exception=False`. This ensures that if a transfer fails,
928+
the function does not immediately exit but continues execution, allowing for a retry.
929+
930+
2. **Failure Detection**: After the initial attempt, the code checks for any files that
931+
were not successfully transferred by creating a `remain_files` list.
932+
933+
3. **Conditions for Retry**: An alternative stage-out is attempted only if all of the
934+
following conditions are met:
935+
a. The job is configured to allow alternative stage-out (`altstageout` is True).
936+
b. There are files remaining that failed the first transfer attempt (`remain_files` is not empty).
937+
c. Every file that failed has an alternative destination defined (`has_altstorage` is True).
938+
The alternative destination is stored in the `ddmendpoint_alt` attribute of the file spec.
939+
940+
4. **Executing the Alternative Transfer**: If the conditions are met, the function iterates
941+
through the list of failed files. For each file, it swaps the primary destination
942+
(`entry.ddmendpoint`) with the alternative one (`entry.ddmendpoint_alt`).
943+
944+
5. **Second Transfer Attempt**: The `client.transfer()` method is called a second time.
945+
The `StageOutClient` will now attempt to transfer the previously failed files to their
946+
newly assigned alternative destinations. Files that were successfully transferred in the
947+
first attempt are ignored.
948+
949+
The function returns `True` only if all files are successfully transferred after these
950+
attempts.
951+
893952
The rucio host is internally set by Rucio via the client config file. This can be set directly as a pilot option
894953
--rucio-host.
895954

0 commit comments

Comments
 (0)