Skip to content

Commit 1f954b5

Browse files
committed
feat(flow): Flow trigger dependsOn
1 parent 69992cb commit 1f954b5

27 files changed

Lines changed: 1105 additions & 36 deletions

core/src/main/java/io/kestra/core/models/tasks/Task.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ abstract public class Task implements TaskInterface {
7272
private boolean logToFile = false;
7373

7474
@Builder.Default
75-
@PluginProperty(hidden = true, group = "reliability")
75+
@PluginProperty(hidden = true, group = "reliability", dynamic = true)
7676
private String when = "true";
7777

7878
@Builder.Default

core/src/main/java/io/kestra/core/models/triggers/AbstractTrigger.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ abstract public class AbstractTrigger implements TriggerInterface {
5555

5656
@Builder.Default
5757
@NotNull
58-
@PluginProperty(group = "execution")
58+
@PluginProperty(group = "execution", dynamic = true)
5959
@Schema(
6060
title = "A condition that determines whether the trigger should run.",
6161
description = "A Pebble expression evaluated at trigger time. The trigger fires only when the expression evaluates to a truthy value (`true`, a non-empty string, a non-zero number). Use this to gate trigger execution on dynamic runtime values such as execution labels, flow variables, or environment conditions."
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package io.kestra.core.models.triggers;
2+
3+
import io.kestra.core.models.annotations.PluginProperty;
4+
import io.kestra.core.validations.WindowValidation;
5+
import io.swagger.v3.oas.annotations.media.Schema;
6+
import jakarta.validation.constraints.NotNull;
7+
import lombok.Builder;
8+
import lombok.Getter;
9+
10+
import java.time.Duration;
11+
import java.time.LocalTime;
12+
13+
@Builder
14+
@Getter
15+
@WindowValidation
16+
@Schema(
17+
title = "Window configuration",
18+
description = """
19+
Defines the time window within which all `dependsOn` conditions must be met for the trigger to fire.
20+
The window type is inferred from the fields that are set:
21+
- `deadline` set: daily time deadline window (conditions must be met before the given time each day).
22+
- `from` and `to` both set: daily time window (conditions must be met within the given time range each day).
23+
- `lookback` set: sliding window (conditions must be met within the past duration).
24+
- otherwise: duration window (default, conditions must be met within a fixed duration, configurable via `every` and `offset`)."""
25+
)
26+
public class Window {
27+
@Schema(
28+
title = "Daily deadline",
29+
description = "Use this to define a `DAILY_TIME_DEADLINE` window: the `dependsOn` conditions must be met before this time each day. Mutually exclusive with `from`, `to`, `lookback`, `every`, and `offset`."
30+
)
31+
@PluginProperty
32+
private LocalTime deadline;
33+
34+
@Schema(
35+
title = "Daily window start time",
36+
description = "Use this together with `to` to define a `DAILY_TIME_WINDOW`: the `dependsOn` conditions must be met within the time range `[from, to]` each day. Mutually exclusive with `deadline`, `lookback`, `every`, and `offset`."
37+
)
38+
@PluginProperty
39+
private LocalTime from;
40+
41+
@Schema(
42+
title = "Daily window end time",
43+
description = "Use this together with `from` to define a `DAILY_TIME_WINDOW`: the `dependsOn` conditions must be met within the time range `[from, to]` each day. Mutually exclusive with `deadline`, `lookback`, `every`, and `offset`."
44+
)
45+
@PluginProperty
46+
private LocalTime to;
47+
48+
@Schema(
49+
title = "Duration window size",
50+
description = "Use this to define the size of a `DURATION_WINDOW`: the `dependsOn` conditions must be met within a fixed-duration window that advances at the given interval. Defaults to 1 day. Mutually exclusive with `deadline`, `from`, `to`, and `lookback`."
51+
)
52+
@PluginProperty
53+
private Duration every;
54+
55+
@Schema(
56+
title = "Duration window offset",
57+
description = "Use this to shift the start of the `DURATION_WINDOW` relative to midnight. For example, `PT6H` shifts the window start by 6 hours when combined with a 1-day `every`. Mutually exclusive with `deadline`, `from`, `to`, and `lookback`."
58+
)
59+
@PluginProperty
60+
private Duration offset;
61+
62+
@Schema(
63+
title = "Sliding window lookback duration",
64+
description = "Use this to define a `SLIDING_WINDOW`: the `dependsOn` conditions must be met within the past duration relative to the current time. Mutually exclusive with `deadline`, `from`, `to`, `every`, and `offset`."
65+
)
66+
@PluginProperty
67+
private Duration lookback;
68+
69+
@Schema(
70+
title = "Whether the trigger can fire only once per window",
71+
description = """
72+
When `false` (the default), the window state is NOT reset after a successful evaluation, meaning the trigger can fire again within the same window each time conditions are satisfied.
73+
When `true`, after a successful evaluation the window state is reset, so the same set of conditions must be met again within the window to trigger a new execution."""
74+
)
75+
@Builder.Default
76+
@PluginProperty
77+
@NotNull
78+
private boolean fireOnce = false;
79+
80+
/**
81+
* Converts this {@code Window} to a {@link TimeWindow}.
82+
* <p>
83+
* The {@link TimeWindow.Type} is inferred from the fields that are set:
84+
* <ul>
85+
* <li>{@code deadline} set → {@link TimeWindow.Type#DAILY_TIME_DEADLINE}</li>
86+
* <li>{@code from} and {@code to} both set → {@link TimeWindow.Type#DAILY_TIME_WINDOW}</li>
87+
* <li>{@code lookback} set → {@link TimeWindow.Type#SLIDING_WINDOW}</li>
88+
* <li>otherwise → {@link TimeWindow.Type#DURATION_WINDOW} (with {@code every} as window and {@code offset} as windowAdvance)</li>
89+
* </ul>
90+
*
91+
* @return a {@link TimeWindow} equivalent of this window
92+
*/
93+
public TimeWindow toTimeWindow() {
94+
if (deadline != null) {
95+
return TimeWindow.builder()
96+
.type(TimeWindow.Type.DAILY_TIME_DEADLINE)
97+
.deadline(deadline)
98+
.build();
99+
}
100+
if (from != null && to != null) {
101+
return TimeWindow.builder()
102+
.type(TimeWindow.Type.DAILY_TIME_WINDOW)
103+
.startTime(from)
104+
.endTime(to)
105+
.build();
106+
}
107+
if (lookback != null) {
108+
return TimeWindow.builder()
109+
.type(TimeWindow.Type.SLIDING_WINDOW)
110+
.window(lookback)
111+
.build();
112+
}
113+
return TimeWindow.builder()
114+
.type(TimeWindow.Type.DURATION_WINDOW)
115+
.window(every)
116+
.windowAdvance(offset)
117+
.build();
118+
}
119+
}

core/src/main/java/io/kestra/core/models/triggers/multipleflows/MultipleCondition.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
import io.kestra.core.models.conditions.ConditionContext;
1111
import io.kestra.core.models.triggers.TimeWindow;
1212
import io.kestra.core.utils.Rethrow;
13-
13+
// FIXME check if we keep it or not, maybe refactor the whole multiple flow handling and simplify it.
14+
// At least, if we keep it, we should make it sealed so it's not implemented wildly.
1415
public interface MultipleCondition extends Rethrow.PredicateChecked<ConditionContext, InternalException> {
1516
String getId();
1617

core/src/main/java/io/kestra/core/services/FlowService.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,10 @@ public List<String> warnings(Flow flow, String tenantId) {
455455
.toList();
456456
flowTriggers.forEach(flowTrigger ->
457457
{
458-
if (ListUtils.emptyOnNull(flowTrigger.getConditions()).isEmpty() && flowTrigger.getPreconditions() == null && (flowTrigger.getWhen() == null || "true".equals(flowTrigger.getWhen()))) {
458+
if (ListUtils.emptyOnNull(flowTrigger.getConditions()).isEmpty()
459+
&& flowTrigger.getPreconditions() == null
460+
&& ListUtils.isEmpty(flowTrigger.getDependsOn())
461+
&& (flowTrigger.getWhen() == null || "true".equals(flowTrigger.getWhen()))) {
459462
warnings.add(
460463
"This flow will be triggered for EVERY execution of EVERY flow on your instance. We recommend adding the preconditions property to the Flow trigger '" + flowTrigger.getId()
461464
+ "'."

core/src/main/java/io/kestra/core/topologies/FlowTopologyService.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,10 @@ protected boolean isTriggerChild(Flow parent, Flow child) {
203203
boolean preconditionMatch = flowTriggers.stream()
204204
.anyMatch(flow -> flow.getPreconditions() == null || validatePreconditions(flow.getPreconditions(), parent, execution));
205205

206-
return conditionMatch && preconditionMatch;
206+
boolean dependsOnMatch = flowTriggers.stream()
207+
.anyMatch(flow -> ListUtils.isEmpty(flow.getDependsOn()) || validateDependsOn(flow.getDependsOn(), parent, execution));
208+
209+
return conditionMatch && dependsOnMatch && preconditionMatch;
207210
}
208211

209212
private boolean validateCondition(Condition condition, FlowInterface child, Execution execution) {
@@ -262,6 +265,12 @@ private boolean validatePreconditions(io.kestra.plugin.core.trigger.Flow.Precond
262265
return upstreamFlowMatched && whereMatched;
263266
}
264267

268+
private boolean validateDependsOn(List<io.kestra.plugin.core.trigger.Flow.Dependency> dependsOn, FlowInterface child, Execution execution) {
269+
return ListUtils.emptyOnNull(dependsOn)
270+
.stream()
271+
.anyMatch(c -> validateCondition(c.asCondition(), child, execution));
272+
}
273+
265274
private boolean isFilterCondition(Condition condition) {
266275
return Stream
267276
.of(
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package io.kestra.core.validations;
2+
3+
import java.lang.annotation.Retention;
4+
import java.lang.annotation.RetentionPolicy;
5+
6+
import io.kestra.core.validations.validator.WindowValidator;
7+
8+
import jakarta.validation.Constraint;
9+
import jakarta.validation.Payload;
10+
11+
@Retention(RetentionPolicy.RUNTIME)
12+
@Constraint(validatedBy = WindowValidator.class)
13+
public @interface WindowValidation {
14+
String message() default "invalid window definition";
15+
16+
Class<?>[] groups() default {};
17+
18+
Class<? extends Payload>[] payload() default {};
19+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package io.kestra.core.validations.validator;
2+
3+
import io.kestra.core.models.triggers.Window;
4+
import io.kestra.core.validations.WindowValidation;
5+
6+
import io.micronaut.core.annotation.AnnotationValue;
7+
import io.micronaut.core.annotation.NonNull;
8+
import io.micronaut.core.annotation.Nullable;
9+
import io.micronaut.validation.validator.constraints.ConstraintValidator;
10+
import io.micronaut.validation.validator.constraints.ConstraintValidatorContext;
11+
import jakarta.inject.Singleton;
12+
13+
@Singleton
14+
public class WindowValidator implements ConstraintValidator<WindowValidation, Window> {
15+
16+
@Override
17+
public boolean isValid(
18+
@Nullable Window value,
19+
@NonNull AnnotationValue<WindowValidation> annotationMetadata,
20+
@NonNull ConstraintValidatorContext context) {
21+
if (value == null) {
22+
return true;
23+
}
24+
25+
if (value.getDeadline() != null) {
26+
return validateDailyTimeDeadline(value, context);
27+
}
28+
if (value.getFrom() != null && value.getTo() != null) {
29+
return validateDailyTimeWindow(value, context);
30+
}
31+
if (value.getFrom() != null || value.getTo() != null) {
32+
context.disableDefaultConstraintViolation();
33+
if (value.getFrom() == null) {
34+
context.buildConstraintViolationWithTemplate("Window of type `DAILY_TIME_WINDOW` must have a from time.").addConstraintViolation();
35+
} else {
36+
context.buildConstraintViolationWithTemplate("Window of type `DAILY_TIME_WINDOW` must have a to time.").addConstraintViolation();
37+
}
38+
return false;
39+
}
40+
if (value.getLookback() != null) {
41+
return validateSlidingWindow(value, context);
42+
}
43+
return true;
44+
}
45+
46+
private boolean validateDailyTimeDeadline(Window value, ConstraintValidatorContext context) {
47+
if (value.getEvery() != null || value.getOffset() != null || value.getFrom() != null || value.getTo() != null || value.getLookback() != null) {
48+
context.disableDefaultConstraintViolation();
49+
if (value.getEvery() != null) {
50+
context.buildConstraintViolationWithTemplate("Window of type `DAILY_TIME_DEADLINE` cannot have an every duration.").addConstraintViolation();
51+
}
52+
if (value.getOffset() != null) {
53+
context.buildConstraintViolationWithTemplate("Window of type `DAILY_TIME_DEADLINE` cannot have an offset.").addConstraintViolation();
54+
}
55+
if (value.getFrom() != null) {
56+
context.buildConstraintViolationWithTemplate("Window of type `DAILY_TIME_DEADLINE` cannot have a from time.").addConstraintViolation();
57+
}
58+
if (value.getTo() != null) {
59+
context.buildConstraintViolationWithTemplate("Window of type `DAILY_TIME_DEADLINE` cannot have a to time.").addConstraintViolation();
60+
}
61+
if (value.getLookback() != null) {
62+
context.buildConstraintViolationWithTemplate("Window of type `DAILY_TIME_DEADLINE` cannot have a lookback duration.").addConstraintViolation();
63+
}
64+
return false;
65+
}
66+
return true;
67+
}
68+
69+
private boolean validateDailyTimeWindow(Window value, ConstraintValidatorContext context) {
70+
if (value.getDeadline() != null || value.getEvery() != null || value.getOffset() != null || value.getLookback() != null) {
71+
context.disableDefaultConstraintViolation();
72+
if (value.getDeadline() != null) {
73+
context.buildConstraintViolationWithTemplate("Window of type `DAILY_TIME_WINDOW` cannot have a deadline.").addConstraintViolation();
74+
}
75+
if (value.getEvery() != null) {
76+
context.buildConstraintViolationWithTemplate("Window of type `DAILY_TIME_WINDOW` cannot have an every duration.").addConstraintViolation();
77+
}
78+
if (value.getOffset() != null) {
79+
context.buildConstraintViolationWithTemplate("Window of type `DAILY_TIME_WINDOW` cannot have an offset.").addConstraintViolation();
80+
}
81+
if (value.getLookback() != null) {
82+
context.buildConstraintViolationWithTemplate("Window of type `DAILY_TIME_WINDOW` cannot have a lookback duration.").addConstraintViolation();
83+
}
84+
return false;
85+
}
86+
return true;
87+
}
88+
89+
private boolean validateSlidingWindow(Window value, ConstraintValidatorContext context) {
90+
if (value.getDeadline() != null || value.getFrom() != null || value.getTo() != null || value.getOffset() != null || value.getEvery() != null) {
91+
context.disableDefaultConstraintViolation();
92+
if (value.getDeadline() != null) {
93+
context.buildConstraintViolationWithTemplate("Window of type `SLIDING_WINDOW` cannot have a deadline.").addConstraintViolation();
94+
}
95+
if (value.getFrom() != null) {
96+
context.buildConstraintViolationWithTemplate("Window of type `SLIDING_WINDOW` cannot have a from time.").addConstraintViolation();
97+
}
98+
if (value.getTo() != null) {
99+
context.buildConstraintViolationWithTemplate("Window of type `SLIDING_WINDOW` cannot have a to time.").addConstraintViolation();
100+
}
101+
if (value.getOffset() != null) {
102+
context.buildConstraintViolationWithTemplate("Window of type `SLIDING_WINDOW` cannot have an offset.").addConstraintViolation();
103+
}
104+
if (value.getEvery() != null) {
105+
context.buildConstraintViolationWithTemplate("Window of type `SLIDING_WINDOW` cannot have an every duration.").addConstraintViolation();
106+
}
107+
return false;
108+
}
109+
return true;
110+
}
111+
}

0 commit comments

Comments
 (0)