diff --git a/docs/guides/configuration.md b/docs/guides/configuration.md index b1b06c59f6..52ebdf7793 100644 --- a/docs/guides/configuration.md +++ b/docs/guides/configuration.md @@ -288,6 +288,34 @@ Conceptually, we can group the root level parameters into the following types. E The rest of this page provides additional detail for some of the configuration options and provides brief examples. Comprehensive lists of configuration options are at the [configuration reference page](../reference/configuration.md). +### Cache directory + +By default, the SQLMesh cache is stored in a `.cache` directory within your project folder. You can customize the cache location using the `cache_dir` configuration option: + +=== "YAML" + + ```yaml linenums="1" + # Relative path to project directory + cache_dir: my_custom_cache + + # Absolute path + cache_dir: /tmp/sqlmesh_cache + + ``` + +=== "Python" + + ```python linenums="1" + from sqlmesh.core.config import Config, ModelDefaultsConfig + + config = Config( + model_defaults=ModelDefaultsConfig(dialect="duckdb"), + cache_dir="/tmp/sqlmesh_cache", + ) + ``` + +The cache directory is automatically created if it doesn't exist. You can clear the cache using the `sqlmesh clean` command. + ### Table/view storage locations SQLMesh creates schemas, physical tables, and views in the data warehouse/engine. Learn more about why and how SQLMesh creates schema in the ["Why does SQLMesh create schemas?" FAQ](../faq/faq.md#schema-question). diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md index e00c9dee1f..40d0eeb26b 100644 --- a/docs/reference/configuration.md +++ b/docs/reference/configuration.md @@ -20,6 +20,7 @@ Configuration options for SQLMesh project directories. | ------------------ | ------------------------------------------------------------------------------------------------------------------ | :----------: | :------: | | `ignore_patterns` | Files that match glob patterns specified in this list are ignored when scanning the project folder (Default: `[]`) | list[string] | N | | `project` | The project name of this config. Used for [multi-repo setups](../guides/multi_repo.md). | string | N | +| `cache_dir` | The directory to store the SQLMesh cache. Can be an absolute path or relative to the project directory. (Default: `.cache`) | string | N | ### Environments diff --git a/sqlmesh/core/config/root.py b/sqlmesh/core/config/root.py index a8b8a2a797..315728aceb 100644 --- a/sqlmesh/core/config/root.py +++ b/sqlmesh/core/config/root.py @@ -120,6 +120,7 @@ class Config(BaseConfig): disable_anonymized_analytics: Whether to disable the anonymized analytics collection. before_all: SQL statements or macros to be executed at the start of the `sqlmesh plan` and `sqlmesh run` commands. after_all: SQL statements or macros to be executed at the end of the `sqlmesh plan` and `sqlmesh run` commands. + cache_dir: The directory to store the SQLMesh cache. Defaults to .cache in the project folder. """ gateways: GatewayDict = {"": GatewayConfig()} @@ -165,6 +166,7 @@ class Config(BaseConfig): after_all: t.Optional[t.List[str]] = None linter: LinterConfig = LinterConfig() janitor: JanitorConfig = JanitorConfig() + cache_dir: t.Optional[str] = None _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = { "gateways": UpdateStrategy.NESTED_UPDATE, diff --git a/sqlmesh/core/config/scheduler.py b/sqlmesh/core/config/scheduler.py index 5cbfc6a71c..69adcafe70 100644 --- a/sqlmesh/core/config/scheduler.py +++ b/sqlmesh/core/config/scheduler.py @@ -105,7 +105,7 @@ def create_state_sync(self, context: GenericContext) -> StateSync: schema = context.config.get_state_schema(context.gateway) return EngineAdapterStateSync( - engine_adapter, schema=schema, context_path=context.path, console=context.console + engine_adapter, schema=schema, cache_dir=context.cache_dir, console=context.console ) def state_sync_fingerprint(self, context: GenericContext) -> str: diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index f7f068d6f9..4203e35739 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -504,7 +504,11 @@ def upsert_model(self, model: t.Union[str, Model], **kwargs: t.Any) -> Model: } ) - update_model_schemas(self.dag, models=self._models, context_path=self.path) + update_model_schemas( + self.dag, + models=self._models, + cache_dir=self.cache_dir, + ) if model.dialect: self._all_dialects.add(model.dialect) @@ -640,7 +644,11 @@ def load(self, update_schemas: bool = True) -> GenericContext[C]: self._models.update({fqn: model.copy(update={"mapping_schema": {}})}) continue - update_model_schemas(self.dag, models=self._models, context_path=self.path) + update_model_schemas( + self.dag, + models=self._models, + cache_dir=self.cache_dir, + ) models = self.models.values() for model in models: @@ -2439,6 +2447,9 @@ def clear_caches(self) -> None: cache_path = path / c.CACHE if cache_path.exists(): rmtree(cache_path) + if self.cache_dir.exists(): + rmtree(self.cache_dir) + if isinstance(self.state_sync, CachingStateSync): self.state_sync.clear_cache() @@ -2538,6 +2549,17 @@ def _model_tables(self) -> t.Dict[str, str]: for fqn, snapshot in self.snapshots.items() } + @cached_property + def cache_dir(self) -> Path: + if self.config.cache_dir: + cache_path = Path(self.config.cache_dir) + if cache_path.is_absolute(): + return cache_path + return self.path / cache_path + + # Default to .cache directory in the project path + return self.path / c.CACHE + @cached_property def engine_adapters(self) -> t.Dict[str, EngineAdapter]: """Returns all the engine adapters for the gateways defined in the configuration.""" @@ -2735,6 +2757,7 @@ def _new_selector( dag=dag, default_catalog=self.default_catalog, dialect=self.default_dialect, + cache_dir=self.cache_dir, ) def _register_notification_targets(self) -> None: diff --git a/sqlmesh/core/loader.py b/sqlmesh/core/loader.py index 41bb8a0aef..90894fd23d 100644 --- a/sqlmesh/core/loader.py +++ b/sqlmesh/core/loader.py @@ -887,7 +887,7 @@ class _Cache(CacheBase): def __init__(self, loader: SqlMeshLoader, config_path: Path): self._loader = loader self.config_path = config_path - self._model_cache = ModelCache(self.config_path / c.CACHE) + self._model_cache = ModelCache(self._loader.context.cache_dir) def get_or_load_models( self, target_path: Path, loader: t.Callable[[], t.List[Model]] diff --git a/sqlmesh/core/model/schema.py b/sqlmesh/core/model/schema.py index 9d3d38da6e..e29cacade0 100644 --- a/sqlmesh/core/model/schema.py +++ b/sqlmesh/core/model/schema.py @@ -7,7 +7,6 @@ from sqlglot.errors import SchemaError from sqlglot.schema import MappingSchema -from sqlmesh.core import constants as c from sqlmesh.core.model.cache import ( load_optimized_query_and_mapping, optimized_query_cache_pool, @@ -23,10 +22,10 @@ def update_model_schemas( dag: DAG[str], models: UniqueKeyDict[str, Model], - context_path: Path, + cache_dir: Path, ) -> None: schema = MappingSchema(normalize=False) - optimized_query_cache: OptimizedQueryCache = OptimizedQueryCache(context_path / c.CACHE) + optimized_query_cache: OptimizedQueryCache = OptimizedQueryCache(cache_dir) _update_model_schemas(dag, models, schema, optimized_query_cache) diff --git a/sqlmesh/core/selector.py b/sqlmesh/core/selector.py index 5d068b5d6a..be460d8ce3 100644 --- a/sqlmesh/core/selector.py +++ b/sqlmesh/core/selector.py @@ -10,6 +10,7 @@ from sqlglot.dialects.dialect import Dialect, DialectType from sqlglot.helper import seq_get +from sqlmesh.core import constants as c from sqlmesh.core.dialect import normalize_model_name from sqlmesh.core.environment import Environment from sqlmesh.core.model import update_model_schemas @@ -34,10 +35,12 @@ def __init__( dag: t.Optional[DAG[str]] = None, default_catalog: t.Optional[str] = None, dialect: t.Optional[str] = None, + cache_dir: t.Optional[Path] = None, ): self._state_reader = state_reader self._models = models self._context_path = context_path + self._cache_dir = cache_dir if cache_dir else context_path / c.CACHE self._default_catalog = default_catalog self._dialect = dialect self._git_client = GitClient(context_path) @@ -157,7 +160,7 @@ def get_model(fqn: str) -> t.Optional[Model]: models[model.fqn] = model if needs_update: - update_model_schemas(dag, models=models, context_path=self._context_path) + update_model_schemas(dag, models=models, cache_dir=self._cache_dir) return models diff --git a/sqlmesh/core/state_sync/db/facade.py b/sqlmesh/core/state_sync/db/facade.py index c0d44893c4..779add1cca 100644 --- a/sqlmesh/core/state_sync/db/facade.py +++ b/sqlmesh/core/state_sync/db/facade.py @@ -79,7 +79,7 @@ class EngineAdapterStateSync(StateSync): engine_adapter: The EngineAdapter to use to store and fetch snapshots. schema: The schema to store state metadata in. If None or empty string then no schema is defined console: The console to log information to. - context_path: The context path, used for caching snapshot models. + cache_dir: The cache path, used for caching snapshot models. """ def __init__( @@ -87,14 +87,12 @@ def __init__( engine_adapter: EngineAdapter, schema: t.Optional[str], console: t.Optional[Console] = None, - context_path: Path = Path(), + cache_dir: Path = Path(), ): self.plan_dags_table = exp.table_("_plan_dags", db=schema) self.interval_state = IntervalState(engine_adapter, schema=schema) self.environment_state = EnvironmentState(engine_adapter, schema=schema) - self.snapshot_state = SnapshotState( - engine_adapter, schema=schema, context_path=context_path - ) + self.snapshot_state = SnapshotState(engine_adapter, schema=schema, cache_dir=cache_dir) self.version_state = VersionState(engine_adapter, schema=schema) self.migrator = StateMigrator( engine_adapter, diff --git a/sqlmesh/core/state_sync/db/snapshot.py b/sqlmesh/core/state_sync/db/snapshot.py index 0cf954071d..5b6d96d970 100644 --- a/sqlmesh/core/state_sync/db/snapshot.py +++ b/sqlmesh/core/state_sync/db/snapshot.py @@ -8,7 +8,6 @@ from sqlglot import exp from pydantic import Field -from sqlmesh.core import constants as c from sqlmesh.core.engine_adapter import EngineAdapter from sqlmesh.core.state_sync.db.utils import ( snapshot_name_version_filter, @@ -53,7 +52,7 @@ def __init__( self, engine_adapter: EngineAdapter, schema: t.Optional[str] = None, - context_path: Path = Path(), + cache_dir: Path = Path(), ): self.engine_adapter = engine_adapter self.snapshots_table = exp.table_("_snapshots", db=schema) @@ -79,7 +78,7 @@ def __init__( "next_auto_restatement_ts": exp.DataType.build("bigint"), } - self._snapshot_cache = SnapshotCache(context_path / c.CACHE) + self._snapshot_cache = SnapshotCache(cache_dir) def push_snapshots(self, snapshots: t.Iterable[Snapshot], overwrite: bool = False) -> None: """Pushes snapshots to the state store. diff --git a/sqlmesh/dbt/loader.py b/sqlmesh/dbt/loader.py index 672ad1ac3e..0f896d5bec 100644 --- a/sqlmesh/dbt/loader.py +++ b/sqlmesh/dbt/loader.py @@ -5,7 +5,6 @@ import typing as t import sqlmesh.core.dialect as d from pathlib import Path -from sqlmesh.core import constants as c from sqlmesh.core.config import ( Config, ConnectionConfig, @@ -330,8 +329,8 @@ def __init__( self._yaml_max_mtimes = yaml_max_mtimes target = t.cast(TargetConfig, project.context.target) - cache_path = loader.config_path / c.CACHE / target.name - self._model_cache = ModelCache(cache_path) + cache_dir = loader.context.cache_dir / target.name + self._model_cache = ModelCache(cache_dir) def get_or_load_models( self, target_path: Path, loader: t.Callable[[], t.List[Model]] diff --git a/sqlmesh/dbt/manifest.py b/sqlmesh/dbt/manifest.py index 49827e68a7..19795a0b9b 100644 --- a/sqlmesh/dbt/manifest.py +++ b/sqlmesh/dbt/manifest.py @@ -77,6 +77,7 @@ def __init__( profile_name: str, target: TargetConfig, variable_overrides: t.Optional[t.Dict[str, t.Any]] = None, + cache_dir: t.Optional[str] = None, ): self.project_path = project_path self.profiles_path = profiles_path @@ -99,8 +100,16 @@ def __init__( self._tests_by_owner: t.Dict[str, t.List[TestConfig]] = defaultdict(list) self._disabled_refs: t.Optional[t.Set[str]] = None self._disabled_sources: t.Optional[t.Set[str]] = None + + if cache_dir is not None: + cache_path = Path(cache_dir) + if not cache_path.is_absolute(): + cache_path = self.project_path / cache_path + else: + cache_path = self.project_path / c.CACHE + self._call_cache: FileCache[t.Dict[str, t.List[CallNames]]] = FileCache( - self.project_path / c.CACHE, "jinja_calls" + cache_path, "jinja_calls" ) self._on_run_start_per_package: t.Dict[str, HookConfigs] = defaultdict(dict) diff --git a/sqlmesh/dbt/project.py b/sqlmesh/dbt/project.py index e491736086..ac36ee4e0a 100644 --- a/sqlmesh/dbt/project.py +++ b/sqlmesh/dbt/project.py @@ -75,6 +75,7 @@ def load(cls, context: DbtContext, variables: t.Optional[t.Dict[str, t.Any]] = N profile_name, target=profile.target, variable_overrides=variable_overrides, + cache_dir=context.sqlmesh_config.cache_dir, ) extra_fields = profile.target.extra diff --git a/tests/core/state_sync/test_export_import.py b/tests/core/state_sync/test_export_import.py index 0b22656d1e..2d20199d33 100644 --- a/tests/core/state_sync/test_export_import.py +++ b/tests/core/state_sync/test_export_import.py @@ -33,7 +33,7 @@ def state_sync(tmp_path: Path, example_project_config: Config) -> StateSync: return EngineAdapterStateSync( engine_adapter=example_project_config.get_state_connection("main").create_engine_adapter(), # type: ignore schema=c.SQLMESH, - context_path=tmp_path, + cache_dir=tmp_path / c.CACHE, ) diff --git a/tests/core/state_sync/test_state_sync.py b/tests/core/state_sync/test_state_sync.py index dd68b5c515..cf1d35bbfc 100644 --- a/tests/core/state_sync/test_state_sync.py +++ b/tests/core/state_sync/test_state_sync.py @@ -55,7 +55,9 @@ @pytest.fixture def state_sync(duck_conn, tmp_path): state_sync = EngineAdapterStateSync( - create_engine_adapter(lambda: duck_conn, "duckdb"), schema=c.SQLMESH, context_path=tmp_path + create_engine_adapter(lambda: duck_conn, "duckdb"), + schema=c.SQLMESH, + cache_dir=tmp_path / c.CACHE, ) state_sync.migrate(default_catalog=None) return state_sync @@ -2082,7 +2084,9 @@ def test_version_schema(state_sync: EngineAdapterStateSync, tmp_path) -> None: # Start with a clean slate. state_sync = EngineAdapterStateSync( - create_engine_adapter(duckdb.connect, "duckdb"), schema=c.SQLMESH, context_path=tmp_path + create_engine_adapter(duckdb.connect, "duckdb"), + schema=c.SQLMESH, + cache_dir=tmp_path / c.CACHE, ) with pytest.raises( @@ -2203,7 +2207,9 @@ def test_migrate(state_sync: EngineAdapterStateSync, mocker: MockerFixture, tmp_ # Start with a clean slate. state_sync = EngineAdapterStateSync( - create_engine_adapter(duckdb.connect, "duckdb"), schema=c.SQLMESH, context_path=tmp_path + create_engine_adapter(duckdb.connect, "duckdb"), + schema=c.SQLMESH, + cache_dir=tmp_path / c.CACHE, ) state_sync.migrate(default_catalog=None) @@ -2254,7 +2260,9 @@ def test_rollback(state_sync: EngineAdapterStateSync, mocker: MockerFixture) -> def test_first_migration_failure(duck_conn, mocker: MockerFixture, tmp_path) -> None: state_sync = EngineAdapterStateSync( - create_engine_adapter(lambda: duck_conn, "duckdb"), schema=c.SQLMESH, context_path=tmp_path + create_engine_adapter(lambda: duck_conn, "duckdb"), + schema=c.SQLMESH, + cache_dir=tmp_path / c.CACHE, ) mocker.patch.object(state_sync.migrator, "_migrate_rows", side_effect=Exception("mocked error")) with pytest.raises( diff --git a/tests/core/test_context.py b/tests/core/test_context.py index 7024e9f73f..a8d2d04da9 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -652,6 +652,96 @@ def test_clear_caches(tmp_path: pathlib.Path): assert not cache_dir.exists() +def test_cache_path_configurations(tmp_path: pathlib.Path): + project_dir = tmp_path / "project" + project_dir.mkdir(parents=True) + config_file = project_dir / "config.yaml" + + # Test relative path + config_file.write_text("model_defaults:\n dialect: duckdb\ncache_dir: .my_cache") + context = Context(paths=str(project_dir)) + assert context.cache_dir == project_dir / ".my_cache" + + # Test absolute path + abs_cache = tmp_path / "abs_cache" + config_file.write_text(f"model_defaults:\n dialect: duckdb\ncache_dir: {abs_cache}") + context = Context(paths=str(project_dir)) + assert context.cache_dir == abs_cache + + # Test default + config_file.write_text("model_defaults:\n dialect: duckdb") + context = Context(paths=str(project_dir)) + assert context.cache_dir == project_dir / ".cache" + + +def test_plan_apply_populates_cache(copy_to_temp_path, mocker): + sushi_paths = copy_to_temp_path("examples/sushi") + sushi_path = sushi_paths[0] + custom_cache_dir = sushi_path.parent / "custom_cache" + + # Modify the existing config.py to add cache_dir to test_config + config_py_path = sushi_path / "config.py" + with open(config_py_path, "r") as f: + config_content = f.read() + + # Add cache_dir to the test_config definition + config_content = config_content.replace( + 'test_config = Config(\n gateways={"in_memory": GatewayConfig(connection=DuckDBConnectionConfig())},\n default_gateway="in_memory",\n plan=PlanConfig(\n auto_categorize_changes=CategorizerConfig(\n sql=AutoCategorizationMode.SEMI, python=AutoCategorizationMode.OFF\n )\n ),\n model_defaults=model_defaults,\n)', + f"""test_config = Config( + gateways={{"in_memory": GatewayConfig(connection=DuckDBConnectionConfig())}}, + default_gateway="in_memory", + plan=PlanConfig( + auto_categorize_changes=CategorizerConfig( + sql=AutoCategorizationMode.SEMI, python=AutoCategorizationMode.OFF + ) + ), + model_defaults=model_defaults, + cache_dir="{custom_cache_dir.as_posix()}", +)""", + ) + + with open(config_py_path, "w") as f: + f.write(config_content) + + # Create context with the test config + context = Context(paths=sushi_path, config="test_config") + custom_cache_dir = context.cache_dir + assert "custom_cache" in str(custom_cache_dir) + assert (custom_cache_dir / "optimized_query").exists() + assert (custom_cache_dir / "model_definition").exists() + assert not (custom_cache_dir / "snapshot").exists() + + # Clear the cache + context.clear_caches() + assert not custom_cache_dir.exists() + + plan = context.plan("dev", create_from="prod", skip_tests=True) + context.apply(plan) + + # Cache directory should now exist again + assert custom_cache_dir.exists() + assert any(custom_cache_dir.iterdir()) + + # Since the cache has been deleted post loading here only snapshot should exist + assert (custom_cache_dir / "snapshot").exists() + assert not (custom_cache_dir / "optimized_query").exists() + assert not (custom_cache_dir / "model_definition").exists() + + # New context should load same models and create the cache for optimized_query and model_definition + initial_model_count = len(context.models) + context2 = Context(paths=context.path, config="test_config") + cached_model_count = len(context2.models) + + assert initial_model_count == cached_model_count > 0 + assert (custom_cache_dir / "optimized_query").exists() + assert (custom_cache_dir / "model_definition").exists() + assert (custom_cache_dir / "snapshot").exists() + + # Clear caches should remove the custom cache directory + context.clear_caches() + assert not custom_cache_dir.exists() + + def test_ignore_files(mocker: MockerFixture, tmp_path: pathlib.Path): mocker.patch.object( sqlmesh.core.constants, @@ -1831,7 +1921,7 @@ def assert_cached_violations_exist(cache: OptimizedQueryCache, model: Model): ctx.plan_builder("dev") # Case: Ensure error violations are cached if the model did not pass linting - cache = OptimizedQueryCache(tmp_path / c.CACHE) + cache = OptimizedQueryCache(ctx.cache_dir) assert_cached_violations_exist(cache, error_model) diff --git a/web/server/watcher.py b/web/server/watcher.py index 0474059e9f..588f6c5e22 100644 --- a/web/server/watcher.py +++ b/web/server/watcher.py @@ -30,7 +30,10 @@ async def watch_project() -> None: (settings.project_path / c.SEEDS).resolve(), ] ignore_dirs = [".env"] - ignore_paths: t.List[t.Union[str, Path]] = [(settings.project_path / c.CACHE).resolve()] + cache_path = ( + context.cache_dir.resolve() if context else (settings.project_path / c.CACHE).resolve() + ) + ignore_paths: t.List[t.Union[str, Path]] = [cache_path] ignore_entity_patterns = context.config.ignore_patterns if context else c.IGNORE_PATTERNS ignore_entity_patterns.append("^.*\\.db(\\.wal)?$")