diff --git a/examples/multi/repo_1/linter/__init__.py b/examples/multi/repo_1/linter/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sqlmesh/cli/main.py b/sqlmesh/cli/main.py index 93d3051bcd..ac1b69ee62 100644 --- a/sqlmesh/cli/main.py +++ b/sqlmesh/cli/main.py @@ -520,6 +520,11 @@ def diff(ctx: click.Context, environment: t.Optional[str] = None) -> None: help="Explain the plan instead of applying it.", default=None, ) +@click.option( + "--min-intervals", + default=0, + help="For every model, ensure at least this many intervals are covered by a missing intervals check regardless of the plan start date", +) @opt.verbose @click.pass_context @error_handler diff --git a/sqlmesh/core/console.py b/sqlmesh/core/console.py index 2435e56ea8..ae48722288 100644 --- a/sqlmesh/core/console.py +++ b/sqlmesh/core/console.py @@ -2073,8 +2073,8 @@ def _prompt_backfill( if not plan_builder.override_end: if plan.provided_end: blank_meaning = f"'{time_like_to_str(plan.provided_end)}'" - elif plan.interval_end_per_model: - max_end = max(plan.interval_end_per_model.values()) + elif plan.end_override_per_model: + max_end = max(plan.end_override_per_model.values()) blank_meaning = f"'{time_like_to_str(max_end)}'" else: blank_meaning = "now" diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index 4203e35739..c8cfbda03c 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -46,6 +46,7 @@ from pathlib import Path from shutil import rmtree from types import MappingProxyType +from datetime import datetime from sqlglot import Dialect, exp from sqlglot.helper import first @@ -125,6 +126,8 @@ format_tz_datetime, now_timestamp, now, + to_datetime, + make_exclusive, ) from sqlmesh.utils.errors import ( CircuitBreakerError, @@ -1222,6 +1225,7 @@ def plan( diff_rendered: t.Optional[bool] = None, skip_linter: t.Optional[bool] = None, explain: t.Optional[bool] = None, + min_intervals: t.Optional[int] = None, ) -> Plan: """Interactively creates a plan. @@ -1268,6 +1272,8 @@ def plan( diff_rendered: Whether the diff should compare raw vs rendered models skip_linter: Linter runs by default so this will skip it if enabled explain: Whether to explain the plan instead of applying it. + min_intervals: Adjust the plan start date on a per-model basis in order to ensure at least this many intervals are covered + on every model when checking for missing intervals Returns: The populated Plan object. @@ -1296,6 +1302,7 @@ def plan( diff_rendered=diff_rendered, skip_linter=skip_linter, explain=explain, + min_intervals=min_intervals, ) plan = plan_builder.build() @@ -1345,6 +1352,7 @@ def plan_builder( diff_rendered: t.Optional[bool] = None, skip_linter: t.Optional[bool] = None, explain: t.Optional[bool] = None, + min_intervals: t.Optional[int] = None, ) -> PlanBuilder: """Creates a plan builder. @@ -1381,6 +1389,8 @@ def plan_builder( enable_preview: Indicates whether to enable preview for forward-only models in development environments. run: Whether to run latest intervals as part of the plan application. diff_rendered: Whether the diff should compare raw vs rendered models + min_intervals: Adjust the plan start date on a per-model basis in order to ensure at least this many intervals are covered + on every model when checking for missing intervals Returns: The plan builder. @@ -1408,6 +1418,7 @@ def plan_builder( "run": run, "diff_rendered": diff_rendered, "skip_linter": skip_linter, + "min_intervals": min_intervals, } user_provided_flags: t.Dict[str, UserProvidedFlags] = { k: v for k, v in kwargs.items() if v is not None @@ -1530,6 +1541,16 @@ def plan_builder( # Refresh snapshot intervals to ensure that they are up to date with values reflected in the max_interval_end_per_model. self.state_sync.refresh_snapshot_intervals(context_diff.snapshots.values()) + start_override_per_model = self._calculate_start_override_per_model( + min_intervals, + start or default_start, + end or default_end, + execution_time or now(), + backfill_models, + snapshots, + max_interval_end_per_model, + ) + return self.PLAN_BUILDER_TYPE( context_diff=context_diff, start=start, @@ -1560,7 +1581,8 @@ def plan_builder( ), end_bounded=not run, ensure_finalized_snapshots=self.config.plan.use_finalized_state, - interval_end_per_model=max_interval_end_per_model, + start_override_per_model=start_override_per_model, + end_override_per_model=max_interval_end_per_model, console=self.console, user_provided_flags=user_provided_flags, explain=explain or False, @@ -2850,7 +2872,7 @@ def _plan_preview_enabled(self) -> bool: def _get_plan_default_start_end( self, snapshots: t.Dict[str, Snapshot], - max_interval_end_per_model: t.Dict[str, int], + max_interval_end_per_model: t.Dict[str, datetime], backfill_models: t.Optional[t.Set[str]], modified_model_names: t.Set[str], execution_time: t.Optional[TimeLike] = None, @@ -2858,7 +2880,7 @@ def _get_plan_default_start_end( if not max_interval_end_per_model: return None, None - default_end = max(max_interval_end_per_model.values()) + default_end = to_timestamp(max(max_interval_end_per_model.values())) default_start: t.Optional[int] = None # Infer the default start by finding the smallest interval start that corresponds to the default end. for model_name in backfill_models or modified_model_names or max_interval_end_per_model: @@ -2887,19 +2909,101 @@ def _get_plan_default_start_end( return default_start, default_end + def _calculate_start_override_per_model( + self, + min_intervals: t.Optional[int], + plan_start: t.Optional[TimeLike], + plan_end: t.Optional[TimeLike], + plan_execution_time: TimeLike, + backfill_model_fqns: t.Optional[t.Set[str]], + snapshots_by_model_fqn: t.Dict[str, Snapshot], + end_override_per_model: t.Optional[t.Dict[str, datetime]], + ) -> t.Dict[str, datetime]: + if not min_intervals or not backfill_model_fqns or not plan_start: + # If there are no models to backfill, there are no intervals to consider for backfill, so we dont need to consider a minimum number + # If the plan doesnt have a start date, all intervals are considered already so we dont need to consider a minimum number + # If we dont have a minimum number of intervals to consider, then we dont need to adjust the start date on a per-model basis + return {} + + start_overrides: t.Dict[str, datetime] = {} + end_override_per_model = end_override_per_model or {} + + plan_execution_time_dt = to_datetime(plan_execution_time) + plan_start_dt = to_datetime(plan_start, relative_base=plan_execution_time_dt) + plan_end_dt = to_datetime( + plan_end or plan_execution_time_dt, relative_base=plan_execution_time_dt + ) + + # we need to take the DAG into account so that parent models can be expanded to cover at least as much as their children + # for example, A(hourly) <- B(daily) + # if min_intervals=1, A would have 1 hour and B would have 1 day + # but B depends on A so in order for B to have 1 valid day, A needs to be expanded to 24 hours + backfill_dag: DAG[str] = DAG() + for fqn in backfill_model_fqns: + backfill_dag.add( + fqn, + [ + p.name + for p in snapshots_by_model_fqn[fqn].parents + if p.name in backfill_model_fqns + ], + ) + + # start from the leaf nodes and work back towards the root because the min_start at the root node is determined by the calculated starts in the leaf nodes + reversed_dag = backfill_dag.reversed + graph = reversed_dag.graph + + for model_fqn in reversed_dag: + # Get the earliest start from all immediate children of this snapshot + # this works because topological ordering guarantees that they've already been visited + # and we always set a start override + min_child_start = min( + [start_overrides[immediate_child_fqn] for immediate_child_fqn in graph[model_fqn]], + default=plan_start_dt, + ) + + snapshot = snapshots_by_model_fqn.get(model_fqn) + + if not snapshot: + continue + + starting_point = end_override_per_model.get(model_fqn, plan_end_dt) + if node_end := snapshot.node.end: + # if we dont do this, if the node end is a *date* (as opposed to a timestamp) + # we end up incorrectly winding back an extra day + node_end_dt = make_exclusive(node_end) + + if node_end_dt < plan_end_dt: + # if the model has an end date that has already elapsed, use that as a starting point for calculating min_intervals + # instead of the plan end. If we use the plan end, we will return intervals in the future which are invalid + starting_point = node_end_dt + + snapshot_start = snapshot.node.cron_floor(starting_point) + + for _ in range(min_intervals): + # wind back the starting point by :min_intervals intervals to arrive at the minimum snapshot start date + snapshot_start = snapshot.node.cron_prev(snapshot_start) + + start_overrides[model_fqn] = min(min_child_start, snapshot_start) + + return start_overrides + def _get_max_interval_end_per_model( self, snapshots: t.Dict[str, Snapshot], backfill_models: t.Optional[t.Set[str]] - ) -> t.Dict[str, int]: + ) -> t.Dict[str, datetime]: models_for_interval_end = ( self._get_models_for_interval_end(snapshots, backfill_models) if backfill_models is not None else None ) - return self.state_sync.max_interval_end_per_model( - c.PROD, - models=models_for_interval_end, - ensure_finalized_snapshots=self.config.plan.use_finalized_state, - ) + return { + model_fqn: to_datetime(ts) + for model_fqn, ts in self.state_sync.max_interval_end_per_model( + c.PROD, + models=models_for_interval_end, + ensure_finalized_snapshots=self.config.plan.use_finalized_state, + ).items() + } @staticmethod def _get_models_for_interval_end( diff --git a/sqlmesh/core/node.py b/sqlmesh/core/node.py index 98a24884cd..874e74b3e9 100644 --- a/sqlmesh/core/node.py +++ b/sqlmesh/core/node.py @@ -31,6 +31,9 @@ class IntervalUnit(str, Enum): IntervalUnit can be one of 5 types, YEAR, MONTH, DAY, HOUR, MINUTE. The unit is inferred based on the cron schedule of a node. The minimum time delta between a sample set of dates is used to determine which unit a node's schedule is. + + It's designed to align with common partitioning schemes, hence why there is no WEEK unit + because generally tables are not partitioned by week """ YEAR = "year" diff --git a/sqlmesh/core/plan/builder.py b/sqlmesh/core/plan/builder.py index ff953c75a2..567920997e 100644 --- a/sqlmesh/core/plan/builder.py +++ b/sqlmesh/core/plan/builder.py @@ -5,6 +5,7 @@ import typing as t from collections import defaultdict from functools import cached_property +from datetime import datetime from sqlmesh.core.console import PlanBuilderConsole, get_console @@ -85,7 +86,8 @@ class PlanBuilder: ensure_finalized_snapshots: Whether to compare against snapshots from the latest finalized environment state, or to use whatever snapshots are in the current environment state even if the environment is not finalized. - interval_end_per_model: The mapping from model FQNs to target end dates. + start_override_per_model: A mapping of model FQNs to target start dates. + end_override_per_model: A mapping of model FQNs to target end dates. explain: Whether to explain the plan instead of applying it. """ @@ -117,7 +119,8 @@ def __init__( end_bounded: bool = False, ensure_finalized_snapshots: bool = False, explain: bool = False, - interval_end_per_model: t.Optional[t.Dict[str, int]] = None, + start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, + end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, console: t.Optional[PlanBuilderConsole] = None, user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None, ): @@ -133,7 +136,8 @@ def __init__( self._enable_preview = enable_preview self._end_bounded = end_bounded self._ensure_finalized_snapshots = ensure_finalized_snapshots - self._interval_end_per_model = interval_end_per_model + self._start_override_per_model = start_override_per_model + self._end_override_per_model = end_override_per_model self._environment_ttl = environment_ttl self._categorizer_config = categorizer_config or CategorizerConfig() self._auto_categorization_enabled = auto_categorization_enabled @@ -280,7 +284,11 @@ def build(self) -> Plan: self._adjust_new_snapshot_intervals() deployability_index = ( - DeployabilityIndex.create(self._context_diff.snapshots.values(), start=self._start) + DeployabilityIndex.create( + self._context_diff.snapshots.values(), + start=self._start, + start_override_per_model=self._start_override_per_model, + ) if self._is_dev else DeployabilityIndex.all_deployable() ) @@ -291,11 +299,11 @@ def build(self) -> Plan: ) models_to_backfill = self._build_models_to_backfill(dag, restatements) - interval_end_per_model = self._interval_end_per_model - if interval_end_per_model and self.override_end: + end_override_per_model = self._end_override_per_model + if end_override_per_model and self.override_end: # If the end date was provided explicitly by a user, then interval end for each individual # model should be ignored. - interval_end_per_model = None + end_override_per_model = None # this deliberately uses the passed in self._execution_time and not self.execution_time cached property # the reason is because that there can be a delay between the Plan being built and the Plan being actually run, @@ -322,7 +330,8 @@ def build(self) -> Plan: indirectly_modified=indirectly_modified, deployability_index=deployability_index, restatements=restatements, - interval_end_per_model=interval_end_per_model, + start_override_per_model=self._start_override_per_model, + end_override_per_model=end_override_per_model, selected_models_to_backfill=self._backfill_models, models_to_backfill=models_to_backfill, effective_from=self._effective_from, diff --git a/sqlmesh/core/plan/definition.py b/sqlmesh/core/plan/definition.py index 7325dc3532..584c0d9b51 100644 --- a/sqlmesh/core/plan/definition.py +++ b/sqlmesh/core/plan/definition.py @@ -57,7 +57,8 @@ class Plan(PydanticModel, frozen=True): deployability_index: DeployabilityIndex restatements: t.Dict[SnapshotId, Interval] - interval_end_per_model: t.Optional[t.Dict[str, int]] + start_override_per_model: t.Optional[t.Dict[str, datetime]] + end_override_per_model: t.Optional[t.Dict[str, datetime]] selected_models_to_backfill: t.Optional[t.Set[str]] = None """Models that have been explicitly selected for backfill by a user.""" @@ -177,7 +178,8 @@ def missing_intervals(self) -> t.List[SnapshotIntervals]: execution_time=self.execution_time, restatements=self.restatements, deployability_index=self.deployability_index, - interval_end_per_model=self.interval_end_per_model, + start_override_per_model=self.start_override_per_model, + end_override_per_model=self.end_override_per_model, end_bounded=self.end_bounded, ).items() if snapshot.is_model and missing @@ -265,7 +267,8 @@ def to_evaluatable(self) -> EvaluatablePlan: removed_snapshots=sorted(self.context_diff.removed_snapshots), requires_backfill=self.requires_backfill, models_to_backfill=self.models_to_backfill, - interval_end_per_model=self.interval_end_per_model, + start_override_per_model=self.start_override_per_model, + end_override_per_model=self.end_override_per_model, execution_time=self.execution_time, disabled_restatement_models={ s.name @@ -303,7 +306,8 @@ class EvaluatablePlan(PydanticModel): removed_snapshots: t.List[SnapshotId] requires_backfill: bool models_to_backfill: t.Optional[t.Set[str]] = None - interval_end_per_model: t.Optional[t.Dict[str, int]] = None + start_override_per_model: t.Optional[t.Dict[str, datetime]] = None + end_override_per_model: t.Optional[t.Dict[str, datetime]] = None execution_time: t.Optional[TimeLike] = None disabled_restatement_models: t.Set[str] environment_statements: t.Optional[t.List[EnvironmentStatements]] = None diff --git a/sqlmesh/core/plan/evaluator.py b/sqlmesh/core/plan/evaluator.py index a8e2aa7919..03f8bdcf71 100644 --- a/sqlmesh/core/plan/evaluator.py +++ b/sqlmesh/core/plan/evaluator.py @@ -256,7 +256,8 @@ def visit_audit_only_run_stage( plan.end, execution_time=plan.execution_time, end_bounded=plan.end_bounded, - interval_end_per_model=plan.interval_end_per_model, + start_override_per_model=plan.start_override_per_model, + end_override_per_model=plan.end_override_per_model, ) if completion_status.is_failure: diff --git a/sqlmesh/core/plan/stages.py b/sqlmesh/core/plan/stages.py index 863e53387f..9913a87bd0 100644 --- a/sqlmesh/core/plan/stages.py +++ b/sqlmesh/core/plan/stages.py @@ -524,7 +524,8 @@ def _missing_intervals( }, deployability_index=deployability_index, end_bounded=plan.end_bounded, - interval_end_per_model=plan.interval_end_per_model, + start_override_per_model=plan.start_override_per_model, + end_override_per_model=plan.end_override_per_model, ) def _get_audit_only_snapshots( diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index 2c7a2a66ac..2b9cb6189f 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -2,6 +2,7 @@ import logging import typing as t import time +from datetime import datetime from sqlglot import exp from sqlmesh.core import constants as c from sqlmesh.core.console import Console, get_console @@ -105,7 +106,8 @@ def merged_missing_intervals( execution_time: t.Optional[TimeLike] = None, deployability_index: t.Optional[DeployabilityIndex] = None, restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None, - interval_end_per_model: t.Optional[t.Dict[str, int]] = None, + start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, + end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, ignore_cron: bool = False, end_bounded: bool = False, selected_snapshots: t.Optional[t.Set[str]] = None, @@ -123,7 +125,8 @@ def merged_missing_intervals( execution_time: The date/time reference to use for execution time. Defaults to now. deployability_index: Determines snapshots that are deployable in the context of this evaluation. restatements: A set of snapshot names being restated. - interval_end_per_model: The mapping from model FQNs to target end dates. + start_override_per_model: A mapping of model FQNs to target start dates. + end_override_per_model: A mapping of model FQNs to target end dates. ignore_cron: Whether to ignore the node's cron schedule. end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, allow_partials, and other attributes that could cause the intervals to exceed the target end date. @@ -136,7 +139,8 @@ def merged_missing_intervals( execution_time=execution_time, deployability_index=deployability_index, restatements=restatements, - interval_end_per_model=interval_end_per_model, + start_override_per_model=start_override_per_model, + end_override_per_model=end_override_per_model, ignore_cron=ignore_cron, end_bounded=end_bounded, ) @@ -212,7 +216,8 @@ def run( end: t.Optional[TimeLike] = None, execution_time: t.Optional[TimeLike] = None, restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None, - interval_end_per_model: t.Optional[t.Dict[str, int]] = None, + start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, + end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, ignore_cron: bool = False, end_bounded: bool = False, selected_snapshots: t.Optional[t.Set[str]] = None, @@ -227,7 +232,8 @@ def run( end=end, execution_time=execution_time, remove_intervals=restatements, - interval_end_per_model=interval_end_per_model, + start_override_per_model=start_override_per_model, + end_override_per_model=end_override_per_model, ignore_cron=ignore_cron, end_bounded=end_bounded, selected_snapshots=selected_snapshots, @@ -243,7 +249,8 @@ def audit( start: TimeLike, end: TimeLike, execution_time: t.Optional[TimeLike] = None, - interval_end_per_model: t.Optional[t.Dict[str, int]] = None, + start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, + end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, ignore_cron: bool = False, end_bounded: bool = False, selected_snapshots: t.Optional[t.Set[str]] = None, @@ -266,7 +273,8 @@ def audit( end=end, execution_time=execution_time, remove_intervals=remove_intervals, - interval_end_per_model=interval_end_per_model, + start_override_per_model=start_override_per_model, + end_override_per_model=end_override_per_model, ignore_cron=ignore_cron, end_bounded=end_bounded, selected_snapshots=selected_snapshots, @@ -565,7 +573,8 @@ def _run_or_audit( end: t.Optional[TimeLike] = None, execution_time: t.Optional[TimeLike] = None, remove_intervals: t.Optional[t.Dict[SnapshotId, Interval]] = None, - interval_end_per_model: t.Optional[t.Dict[str, int]] = None, + start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, + end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, ignore_cron: bool = False, end_bounded: bool = False, selected_snapshots: t.Optional[t.Set[str]] = None, @@ -586,7 +595,8 @@ def _run_or_audit( execution_time: The date/time time reference to use for execution time. Defaults to now. remove_intervals: A dict of snapshots to their intervals. For evaluation, these are the intervals that will be restated. For audits, these are the intervals that will be reaudited - interval_end_per_model: The mapping from model FQNs to target end dates. + start_override_per_model: A mapping of model FQNs to target start dates. + end_override_per_model: A mapping of model FQNs to target end dates. ignore_cron: Whether to ignore the node's cron schedule. end_bounded: If set to true, the evaluated intervals will be bounded by the target end date, disregarding lookback, allow_partials, and other attributes that could cause the intervals to exceed the target end date. @@ -634,7 +644,8 @@ def _run_or_audit( execution_time, deployability_index=deployability_index, restatements=remove_intervals, - interval_end_per_model=interval_end_per_model, + start_override_per_model=start_override_per_model, + end_override_per_model=end_override_per_model, ignore_cron=ignore_cron, end_bounded=end_bounded, selected_snapshots=selected_snapshots, @@ -797,7 +808,8 @@ def merged_missing_intervals( execution_time: t.Optional[TimeLike] = None, deployability_index: t.Optional[DeployabilityIndex] = None, restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None, - interval_end_per_model: t.Optional[t.Dict[str, int]] = None, + start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, + end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, ignore_cron: bool = False, end_bounded: bool = False, ) -> SnapshotToIntervals: @@ -815,7 +827,8 @@ def merged_missing_intervals( execution_time: The date/time reference to use for execution time. Defaults to now. deployability_index: Determines snapshots that are deployable in the context of this evaluation. restatements: A set of snapshot names being restated. - interval_end_per_model: The mapping from model FQNs to target end dates. + start_override_per_model: A mapping of model FQNs to target start dates. + end_override_per_model: A mapping of model FQNs to target end dates. ignore_cron: Whether to ignore the node's cron schedule. end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, allow_partials, and other attributes that could cause the intervals to exceed the target end date. @@ -830,7 +843,8 @@ def merged_missing_intervals( deployability_index=deployability_index, execution_time=execution_time or now_timestamp(), restatements=restatements, - interval_end_per_model=interval_end_per_model, + start_override_per_model=start_override_per_model, + end_override_per_model=end_override_per_model, ignore_cron=ignore_cron, end_bounded=end_bounded, ) @@ -844,7 +858,8 @@ def compute_interval_params( deployability_index: t.Optional[DeployabilityIndex] = None, execution_time: t.Optional[TimeLike] = None, restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None, - interval_end_per_model: t.Optional[t.Dict[str, int]] = None, + start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, + end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, ignore_cron: bool = False, end_bounded: bool = False, ) -> SnapshotToIntervals: @@ -862,7 +877,8 @@ def compute_interval_params( deployability_index: Determines snapshots that are deployable in the context of this evaluation. execution_time: The date/time reference to use for execution time. restatements: A dict of snapshot names being restated and their intervals. - interval_end_per_model: The mapping from model FQNs to target end dates. + start_override_per_model: A mapping of model FQNs to target start dates. + end_override_per_model: A mapping of model FQNs to target end dates. ignore_cron: Whether to ignore the node's cron schedule. end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback, allow_partials, and other attributes that could cause the intervals to exceed the target end date. @@ -879,7 +895,8 @@ def compute_interval_params( execution_time=execution_time, restatements=restatements, deployability_index=deployability_index, - interval_end_per_model=interval_end_per_model, + start_override_per_model=start_override_per_model, + end_override_per_model=end_override_per_model, ignore_cron=ignore_cron, end_bounded=end_bounded, ).items(): diff --git a/sqlmesh/core/snapshot/definition.py b/sqlmesh/core/snapshot/definition.py index 9b5fa893fc..b6f94108a1 100644 --- a/sqlmesh/core/snapshot/definition.py +++ b/sqlmesh/core/snapshot/definition.py @@ -1469,7 +1469,8 @@ def none_deployable(cls) -> DeployabilityIndex: def create( cls, snapshots: t.Dict[SnapshotId, Snapshot] | t.Collection[Snapshot], - start: t.Optional[TimeLike] = None, + start: t.Optional[TimeLike] = None, # plan start + start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, ) -> DeployabilityIndex: if not isinstance(snapshots, dict): snapshots = {s.snapshot_id: s for s in snapshots} @@ -1477,6 +1478,7 @@ def create( deployability_mapping: t.Dict[SnapshotId, bool] = {} children_deployability_mapping: t.Dict[SnapshotId, bool] = {} representative_shared_version_ids: t.Set[SnapshotId] = set() + start_override_per_model = start_override_per_model or {} start_date_cache: t.Optional[t.Dict[str, datetime]] = {} @@ -1499,12 +1501,12 @@ def create( snapshot.is_model and snapshot.model.auto_restatement_cron is not None ) + snapshot_start = start_override_per_model.get( + node.name, start_date(snapshot, snapshots.values(), cache=start_date_cache) + ) + is_valid_start = ( - snapshot.is_valid_start( - start, start_date(snapshot, snapshots.values(), start_date_cache) - ) - if start is not None - else True + snapshot.is_valid_start(start, snapshot_start) if start is not None else True ) if ( @@ -1800,7 +1802,8 @@ def missing_intervals( execution_time: t.Optional[TimeLike] = None, restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None, deployability_index: t.Optional[DeployabilityIndex] = None, - interval_end_per_model: t.Optional[t.Dict[str, int]] = None, + start_override_per_model: t.Optional[t.Dict[str, datetime]] = None, + end_override_per_model: t.Optional[t.Dict[str, datetime]] = None, ignore_cron: bool = False, end_bounded: bool = False, ) -> t.Dict[Snapshot, Intervals]: @@ -1817,13 +1820,15 @@ def missing_intervals( else earliest_start_date(snapshots, cache=cache, relative_to=end_date) ) restatements = restatements or {} - interval_end_per_model = interval_end_per_model or {} + start_override_per_model = start_override_per_model or {} + end_override_per_model = end_override_per_model or {} deployability_index = deployability_index or DeployabilityIndex.all_deployable() for snapshot in snapshots.values(): if not snapshot.evaluatable: continue - snapshot_start_date = start_dt + + snapshot_start_date = start_override_per_model.get(snapshot.name, start_dt) snapshot_end_date: TimeLike = end_date restated_interval = restatements.get(snapshot.snapshot_id) @@ -1833,9 +1838,9 @@ def missing_intervals( snapshot.intervals = snapshot.intervals.copy() snapshot.remove_interval(restated_interval) - existing_interval_end = interval_end_per_model.get(snapshot.name) + existing_interval_end = end_override_per_model.get(snapshot.name) if existing_interval_end: - if to_timestamp(snapshot_start_date) >= existing_interval_end: + if snapshot_start_date >= existing_interval_end: # The start exceeds the provided interval end, so we can skip this snapshot # since it doesn't have missing intervals by definition continue diff --git a/tests/core/test_context.py b/tests/core/test_context.py index a8d2d04da9..0728168a0f 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -2,7 +2,7 @@ import pathlib import typing as t import re -from datetime import date, timedelta +from datetime import date, timedelta, datetime from tempfile import TemporaryDirectory from unittest.mock import PropertyMock, call, patch @@ -36,6 +36,7 @@ from sqlmesh.core.dialect import parse, schema_ from sqlmesh.core.engine_adapter.duckdb import DuckDBEngineAdapter from sqlmesh.core.environment import Environment, EnvironmentNamingInfo, EnvironmentStatements +from sqlmesh.core.plan.definition import Plan from sqlmesh.core.macros import MacroEvaluator, RuntimeStage from sqlmesh.core.model import load_sql_based_model, model, SqlModel, Model from sqlmesh.core.model.cache import OptimizedQueryCache @@ -2184,6 +2185,7 @@ def test_plan_audit_intervals(tmp_path: pathlib.Path, caplog): plan = ctx.plan( environment="dev", auto_apply=True, no_prompts=True, start="2025-02-01", end="2025-02-01" ) + assert plan.missing_intervals date_snapshot = next(s for s in plan.new_snapshots if "date_example" in s.name) timestamp_snapshot = next(s for s in plan.new_snapshots if "timestamp_example" in s.name) @@ -2408,3 +2410,324 @@ def test_table_diff_ignores_extra_args(sushi_context: Context): show_sample=True, some_tcloud_option=1_000, ) + + +def test_plan_min_intervals(tmp_path: Path): + init_example_project(tmp_path, engine_type="duckdb", dialect="duckdb") + + context = Context( + paths=tmp_path, config=Config(model_defaults=ModelDefaultsConfig(dialect="duckdb")) + ) + + current_time = to_datetime("2020-02-01 00:00:01") + + # initial state of example project + context.plan(auto_apply=True, execution_time=current_time) + + (tmp_path / "models" / "daily_model.sql").write_text(""" + MODEL ( + name sqlmesh_example.daily_model, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column start_dt + ), + start '2020-01-01', + cron '@daily' + ); + + select @start_ds as start_ds, @end_ds as end_ds, @start_dt as start_dt, @end_dt as end_dt; + """) + + (tmp_path / "models" / "weekly_model.sql").write_text(""" + MODEL ( + name sqlmesh_example.weekly_model, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column start_dt + ), + start '2020-01-01', + cron '@weekly' + ); + + select @start_ds as start_ds, @end_ds as end_ds, @start_dt as start_dt, @end_dt as end_dt; + """) + + (tmp_path / "models" / "monthly_model.sql").write_text(""" + MODEL ( + name sqlmesh_example.monthly_model, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column start_dt + ), + start '2020-01-01', + cron '@monthly' + ); + + select @start_ds as start_ds, @end_ds as end_ds, @start_dt as start_dt, @end_dt as end_dt; + """) + + (tmp_path / "models" / "ended_daily_model.sql").write_text(""" + MODEL ( + name sqlmesh_example.ended_daily_model, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column start_dt + ), + start '2020-01-01', + end '2020-01-18', + cron '@daily' + ); + + select @start_ds as start_ds, @end_ds as end_ds, @start_dt as start_dt, @end_dt as end_dt; + """) + + context.load() + + # initial state - backfill from 2020-01-01 -> now() (2020-01-02 00:00:01) on new models + plan = context.plan(execution_time=current_time) + + assert to_datetime(plan.start) == to_datetime("2020-01-01 00:00:00") + assert to_datetime(plan.end) == to_datetime("2020-02-01 00:00:00") + assert to_datetime(plan.execution_time) == to_datetime("2020-02-01 00:00:01") + + def _get_missing_intervals(plan: Plan, name: str) -> t.List[t.Tuple[datetime, datetime]]: + snapshot_id = context.get_snapshot(name, raise_if_missing=True).snapshot_id + snapshot_intervals = next( + si for si in plan.missing_intervals if si.snapshot_id == snapshot_id + ) + return [(to_datetime(s), to_datetime(e)) for s, e in snapshot_intervals.merged_intervals] + + # check initial intervals - should be full time range between start and execution time + assert len(plan.missing_intervals) == 4 + + assert _get_missing_intervals(plan, "sqlmesh_example.daily_model") == [ + (to_datetime("2020-01-01 00:00:00"), to_datetime("2020-02-01 00:00:00")) + ] + assert _get_missing_intervals(plan, "sqlmesh_example.weekly_model") == [ + ( + to_datetime("2020-01-01 00:00:00"), + to_datetime("2020-01-26 00:00:00"), + ) # last week in 2020-01 hasnt fully elapsed yet + ] + assert _get_missing_intervals(plan, "sqlmesh_example.monthly_model") == [ + (to_datetime("2020-01-01 00:00:00"), to_datetime("2020-02-01 00:00:00")) + ] + assert _get_missing_intervals(plan, "sqlmesh_example.ended_daily_model") == [ + (to_datetime("2020-01-01 00:00:00"), to_datetime("2020-01-19 00:00:00")) + ] + + # now, create a dev env for "1 day ago" with min_intervals=1 + plan = context.plan( + environment="pr_env", + start="1 day ago", + execution_time=current_time, + min_intervals=1, + ) + + # this should pick up last day for daily model, last week for weekly model, last month for the monthly model and the last day of "ended_daily_model" before it ended + assert len(plan.missing_intervals) == 4 + + assert _get_missing_intervals(plan, "sqlmesh_example.daily_model") == [ + (to_datetime("2020-01-31 00:00:00"), to_datetime("2020-02-01 00:00:00")) + ] + assert _get_missing_intervals(plan, "sqlmesh_example.weekly_model") == [ + ( + to_datetime("2020-01-19 00:00:00"), # last completed week + to_datetime("2020-01-26 00:00:00"), + ) + ] + assert _get_missing_intervals(plan, "sqlmesh_example.monthly_model") == [ + ( + to_datetime("2020-01-01 00:00:00"), # last completed month + to_datetime("2020-02-01 00:00:00"), + ) + ] + assert _get_missing_intervals(plan, "sqlmesh_example.ended_daily_model") == [ + ( + to_datetime("2020-01-18 00:00:00"), # last day before the model end date + to_datetime("2020-01-19 00:00:00"), + ) + ] + + # run the plan for '1 day ago' but min_intervals=1 + context.apply(plan) + + # show that the data was created (which shows that when the Plan became an EvaluatablePlan and eventually evaluated, the start date overrides didnt get dropped) + assert context.engine_adapter.fetchall( + "select start_dt, end_dt from sqlmesh_example__pr_env.daily_model" + ) == [(to_datetime("2020-01-31 00:00:00"), to_datetime("2020-01-31 23:59:59.999999"))] + assert context.engine_adapter.fetchall( + "select start_dt, end_dt from sqlmesh_example__pr_env.weekly_model" + ) == [ + (to_datetime("2020-01-19 00:00:00"), to_datetime("2020-01-25 23:59:59.999999")), + ] + assert context.engine_adapter.fetchall( + "select start_dt, end_dt from sqlmesh_example__pr_env.monthly_model" + ) == [ + (to_datetime("2020-01-01 00:00:00"), to_datetime("2020-01-31 23:59:59.999999")), + ] + assert context.engine_adapter.fetchall( + "select start_dt, end_dt from sqlmesh_example__pr_env.ended_daily_model" + ) == [ + (to_datetime("2020-01-18 00:00:00"), to_datetime("2020-01-18 23:59:59.999999")), + ] + + +def test_plan_min_intervals_adjusted_for_downstream(tmp_path: Path): + """ + Scenario: + A(hourly) <- B(daily) <- C(weekly) + <- D(two-hourly) + E(monthly) + + We need to ensure that :min_intervals covers at least :min_intervals of all downstream models for the dag to be valid + In this scenario, if min_intervals=1: + - A would need to cover at least (7 days * 24 hours) because its downstream model C is weekly. It should also be unaffected by its sibling, E + - B would need to cover at least 7 days because its downstream model C is weekly + - C would need to cover at least 1 week because min_intervals: 1 + - D would need to cover at least 2 hours because min_intervals: 1 and should be unaffected by C + - E is unrelated to A, B, C and D so would need to cover 1 month satisfy min_intervals: 1. + - It also ensures that each tree branch has a unique cumulative date, because + if the dag is iterated purely in topological order with a global min date it would set A to to 1 month instead if 1 week + """ + + init_example_project(tmp_path, engine_type="duckdb", dialect="duckdb") + + context = Context( + paths=tmp_path, config=Config(model_defaults=ModelDefaultsConfig(dialect="duckdb")) + ) + + current_time = to_datetime("2020-02-01 00:00:01") + + # initial state of example project + context.plan(auto_apply=True, execution_time=current_time) + + (tmp_path / "models" / "hourly_model.sql").write_text(""" + MODEL ( + name sqlmesh_example.hourly_model, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column start_dt, + batch_size 1 + ), + start '2020-01-01', + cron '@hourly' + ); + + select @start_dt as start_dt, @end_dt as end_dt; + """) + + (tmp_path / "models" / "two_hourly_model.sql").write_text(""" + MODEL ( + name sqlmesh_example.two_hourly_model, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column start_dt + ), + start '2020-01-01', + cron '0 */2 * * *' + ); + + select start_dt, end_dt from sqlmesh_example.hourly_model where start_dt between @start_dt and @end_dt; + """) + + (tmp_path / "models" / "unrelated_monthly_model.sql").write_text(""" + MODEL ( + name sqlmesh_example.unrelated_monthly_model, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column start_dt + ), + start '2020-01-01', + cron '@monthly' + ); + + select @start_dt as start_dt, @end_dt as end_dt; + """) + + (tmp_path / "models" / "daily_model.sql").write_text(""" + MODEL ( + name sqlmesh_example.daily_model, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column start_dt + ), + start '2020-01-01', + cron '@daily' + ); + + select start_dt, end_dt from sqlmesh_example.hourly_model where start_dt between @start_dt and @end_dt; + """) + + (tmp_path / "models" / "weekly_model.sql").write_text(""" + MODEL ( + name sqlmesh_example.weekly_model, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column start_dt + ), + start '2020-01-01', + cron '@weekly' + ); + + select start_dt, end_dt from sqlmesh_example.daily_model where start_dt between @start_dt and @end_dt; + """) + + context.load() + + # create a dev env for "1 day ago" with min_intervals=1 + # this should force a weeks worth of intervals for every model + plan = context.plan( + environment="pr_env", + start="1 day ago", + execution_time=current_time, + min_intervals=1, + ) + + def _get_missing_intervals(name: str) -> t.List[t.Tuple[datetime, datetime]]: + snapshot_id = context.get_snapshot(name, raise_if_missing=True).snapshot_id + snapshot_intervals = next( + si for si in plan.missing_intervals if si.snapshot_id == snapshot_id + ) + return [(to_datetime(s), to_datetime(e)) for s, e in snapshot_intervals.merged_intervals] + + # We only operate on completed intervals, so given the current_time this is the range of the last completed week + _get_missing_intervals("sqlmesh_example.weekly_model") == [ + (to_datetime("2020-01-19 00:00:00"), to_datetime("2020-01-26 00:00:00")) + ] + + # The daily model needs to cover the week, so it gets its start date moved back to line up + _get_missing_intervals("sqlmesh_example.daily_model") == [ + (to_datetime("2020-01-19 00:00:00"), to_datetime("2020-02-01 00:00:00")) + ] + + # The hourly model needs to cover both the daily model and the weekly model, so it also gets its start date moved back to line up with the weekly model + assert _get_missing_intervals("sqlmesh_example.hourly_model") == [ + (to_datetime("2020-01-19 00:00:00"), to_datetime("2020-02-01 00:00:00")) + ] + + # The two-hourly model only needs to cover 2 hours and should be unaffected by the fact its sibling node has a weekly child node + # However it still gets backfilled for 24 hours because the plan start is 1 day and this satisfies min_intervals: 1 + assert _get_missing_intervals("sqlmesh_example.two_hourly_model") == [ + (to_datetime("2020-01-31 00:00:00"), to_datetime("2020-02-01 00:00:00")) + ] + + # The unrelated model has no upstream constraints, so its start date doesnt get moved to line up with the weekly model + # However it still gets backfilled for 24 hours because the plan start is 1 day and this satisfies min_intervals: 1 + _get_missing_intervals("sqlmesh_example.unrelated_monthly_model") == [ + (to_datetime("2020-01-01 00:00:00"), to_datetime("2020-02-01 00:00:00")) + ] + + # Check that actually running the plan produces the correct result, since missing intervals are re-calculated in the evaluator + context.apply(plan) + + assert context.engine_adapter.fetchall( + "select min(start_dt), max(end_dt) from sqlmesh_example__pr_env.weekly_model" + ) == [(to_datetime("2020-01-19 00:00:00"), to_datetime("2020-01-25 23:59:59.999999"))] + + assert context.engine_adapter.fetchall( + "select min(start_dt), max(end_dt) from sqlmesh_example__pr_env.daily_model" + ) == [(to_datetime("2020-01-19 00:00:00"), to_datetime("2020-01-31 23:59:59.999999"))] + + assert context.engine_adapter.fetchall( + "select min(start_dt), max(end_dt) from sqlmesh_example__pr_env.hourly_model" + ) == [(to_datetime("2020-01-19 00:00:00"), to_datetime("2020-01-31 23:59:59.999999"))] + + assert context.engine_adapter.fetchall( + "select min(start_dt), max(end_dt) from sqlmesh_example__pr_env.two_hourly_model" + ) == [(to_datetime("2020-01-31 00:00:00"), to_datetime("2020-01-31 23:59:59.999999"))] + + assert context.engine_adapter.fetchall( + "select min(start_dt), max(end_dt) from sqlmesh_example__pr_env.unrelated_monthly_model" + ) == [(to_datetime("2020-01-01 00:00:00"), to_datetime("2020-01-31 23:59:59.999999"))] diff --git a/tests/core/test_plan.py b/tests/core/test_plan.py index 06dbd7d2d3..765a45f7c8 100644 --- a/tests/core/test_plan.py +++ b/tests/core/test_plan.py @@ -756,7 +756,8 @@ def test_missing_intervals_lookback(make_snapshot, mocker: MockerFixture): restatements={}, end_bounded=False, ensure_finalized_snapshots=False, - interval_end_per_model=None, + start_override_per_model=None, + end_override_per_model=None, explain=False, ) @@ -2697,7 +2698,7 @@ def test_plan_start_when_preview_enabled(make_snapshot, mocker: MockerFixture): assert plan_builder.build().start == default_start_for_preview -def test_interval_end_per_model(make_snapshot): +def test_end_override_per_model(make_snapshot): snapshot = make_snapshot(SqlModel(name="a", query=parse_one("select 1, ds"))) snapshot.categorize_as(SnapshotChangeCategory.BREAKING) @@ -2725,20 +2726,18 @@ def test_interval_end_per_model(make_snapshot): plan_builder = PlanBuilder( context_diff, - interval_end_per_model={snapshot.name: to_timestamp("2023-01-09")}, + end_override_per_model={snapshot.name: to_datetime("2023-01-09")}, ) - assert plan_builder.build().interval_end_per_model == { - snapshot.name: to_timestamp("2023-01-09") - } + assert plan_builder.build().end_override_per_model == {snapshot.name: to_datetime("2023-01-09")} # User-provided end should take precedence. plan_builder = PlanBuilder( context_diff, - interval_end_per_model={snapshot.name: to_timestamp("2023-01-09")}, + end_override_per_model={snapshot.name: to_datetime("2023-01-09")}, end="2023-01-10", is_dev=True, ) - assert plan_builder.build().interval_end_per_model is None + assert plan_builder.build().end_override_per_model is None def test_unaligned_start_model_with_forward_only_preview(make_snapshot): diff --git a/tests/core/test_plan_stages.py b/tests/core/test_plan_stages.py index b806b95b75..6e5a1fe43f 100644 --- a/tests/core/test_plan_stages.py +++ b/tests/core/test_plan_stages.py @@ -112,7 +112,6 @@ def test_build_plan_stages_basic( removed_snapshots=[], requires_backfill=True, models_to_backfill=None, - interval_end_per_model=None, execution_time="2023-01-02", disabled_restatement_models=set(), environment_statements=None, @@ -224,7 +223,6 @@ def test_build_plan_stages_with_before_all_and_after_all( removed_snapshots=[], requires_backfill=True, models_to_backfill=None, - interval_end_per_model=None, execution_time="2023-01-02", disabled_restatement_models=set(), environment_statements=environment_statements, @@ -333,7 +331,6 @@ def test_build_plan_stages_select_models( removed_snapshots=[], requires_backfill=True, models_to_backfill={snapshot_a.name}, - interval_end_per_model=None, execution_time="2023-01-02", disabled_restatement_models=set(), environment_statements=None, @@ -434,7 +431,6 @@ def test_build_plan_stages_basic_no_backfill( removed_snapshots=[], requires_backfill=True, models_to_backfill=None, - interval_end_per_model=None, execution_time="2023-01-02", disabled_restatement_models=set(), environment_statements=None, @@ -544,7 +540,6 @@ def test_build_plan_stages_restatement( removed_snapshots=[], requires_backfill=True, models_to_backfill=None, - interval_end_per_model=None, execution_time="2023-01-02", disabled_restatement_models=set(), environment_statements=None, @@ -656,7 +651,6 @@ def test_build_plan_stages_forward_only( removed_snapshots=[], requires_backfill=True, models_to_backfill=None, - interval_end_per_model=None, execution_time="2023-01-02", disabled_restatement_models=set(), environment_statements=None, @@ -785,7 +779,6 @@ def test_build_plan_stages_forward_only_dev( removed_snapshots=[], requires_backfill=True, models_to_backfill=None, - interval_end_per_model=None, execution_time="2023-01-02", disabled_restatement_models=set(), environment_statements=None, @@ -909,7 +902,6 @@ def _get_snapshots(snapshot_ids: t.List[SnapshotId]) -> t.Dict[SnapshotId, Snaps removed_snapshots=[], requires_backfill=True, models_to_backfill=None, - interval_end_per_model=None, execution_time="2023-01-02", disabled_restatement_models=set(), environment_statements=None, @@ -1033,7 +1025,6 @@ def test_build_plan_stages_forward_only_ensure_finalized_snapshots( removed_snapshots=[], requires_backfill=True, models_to_backfill=None, - interval_end_per_model=None, execution_time="2023-01-02", disabled_restatement_models=set(), environment_statements=None, @@ -1106,7 +1097,6 @@ def test_build_plan_stages_removed_model( removed_snapshots=[snapshot_b.snapshot_id], requires_backfill=False, models_to_backfill=None, - interval_end_per_model=None, execution_time="2023-01-02", disabled_restatement_models=set(), environment_statements=None, @@ -1188,7 +1178,6 @@ def test_build_plan_stages_environment_suffix_target_changed( removed_snapshots=[], requires_backfill=False, models_to_backfill=None, - interval_end_per_model=None, execution_time="2023-01-02", disabled_restatement_models=set(), environment_statements=None, @@ -1288,7 +1277,6 @@ def test_build_plan_stages_indirect_non_breaking_no_migration( removed_snapshots=[], requires_backfill=True, models_to_backfill=None, - interval_end_per_model=None, execution_time="2023-01-02", disabled_restatement_models=set(), environment_statements=None, @@ -1376,7 +1364,6 @@ def test_build_plan_stages_indirect_non_breaking_view_migration( removed_snapshots=[], requires_backfill=True, models_to_backfill=None, - interval_end_per_model=None, execution_time="2023-01-02", disabled_restatement_models=set(), environment_statements=None, diff --git a/tests/core/test_snapshot.py b/tests/core/test_snapshot.py index f2f54822f5..ffaee9be74 100644 --- a/tests/core/test_snapshot.py +++ b/tests/core/test_snapshot.py @@ -551,6 +551,50 @@ def test_missing_intervals_past_end_date_with_lookback(make_snapshot): assert snapshot.missing_intervals(start_time, end_time, execution_time=end_time) == [] +def test_missing_intervals_start_override_per_model(make_snapshot: t.Callable[..., Snapshot]): + snapshot = make_snapshot( + load_sql_based_model( + parse(""" + MODEL ( + name a, + kind FULL, + start '2023-02-01', + cron '@daily' + ); + SELECT 1; + """) + ), + version="a", + ) + + # base case - no override + assert list( + missing_intervals(execution_time="2023-02-08 00:05:07", snapshots=[snapshot]).values() + )[0] == [ + (to_timestamp("2023-02-01"), to_timestamp("2023-02-02")), + (to_timestamp("2023-02-02"), to_timestamp("2023-02-03")), + (to_timestamp("2023-02-03"), to_timestamp("2023-02-04")), + (to_timestamp("2023-02-04"), to_timestamp("2023-02-05")), + (to_timestamp("2023-02-05"), to_timestamp("2023-02-06")), + (to_timestamp("2023-02-06"), to_timestamp("2023-02-07")), + (to_timestamp("2023-02-07"), to_timestamp("2023-02-08")), + ] + + # with override, should use the overridden start date when calculating missing intervals + assert list( + missing_intervals( + start="1 day ago", + execution_time="2023-02-08 00:05:07", + snapshots=[snapshot], + start_override_per_model={snapshot.name: to_datetime("2023-02-05 00:00:00")}, + ).values() + )[0] == [ + (to_timestamp("2023-02-05"), to_timestamp("2023-02-06")), + (to_timestamp("2023-02-06"), to_timestamp("2023-02-07")), + (to_timestamp("2023-02-07"), to_timestamp("2023-02-08")), + ] + + def test_incremental_time_self_reference(make_snapshot): snapshot = make_snapshot( SqlModel( @@ -2363,7 +2407,7 @@ def test_snapshot_pickle_intervals(make_snapshot): assert len(snapshot.dev_intervals) > 0 -def test_missing_intervals_interval_end_per_model(make_snapshot): +def test_missing_intervals_end_override_per_model(make_snapshot): snapshot_a = make_snapshot( SqlModel( name="a", @@ -2386,9 +2430,9 @@ def test_missing_intervals_interval_end_per_model(make_snapshot): [snapshot_a, snapshot_b], start="2023-01-04", end="2023-01-10", - interval_end_per_model={ - snapshot_a.name: to_timestamp("2023-01-09"), - snapshot_b.name: to_timestamp("2023-01-06"), + end_override_per_model={ + snapshot_a.name: to_datetime("2023-01-09"), + snapshot_b.name: to_datetime("2023-01-06"), }, ) == { snapshot_a: [ @@ -2408,9 +2452,9 @@ def test_missing_intervals_interval_end_per_model(make_snapshot): [snapshot_a, snapshot_b], start="2023-01-08", end="2023-01-08", - interval_end_per_model={ - snapshot_a.name: to_timestamp("2023-01-09"), - snapshot_b.name: to_timestamp( + end_override_per_model={ + snapshot_a.name: to_datetime("2023-01-09"), + snapshot_b.name: to_datetime( "2023-01-06" ), # The interval end is before the start. The snapshot will be skipped },