Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions docs/guides/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,34 @@ Conceptually, we can group the root level parameters into the following types. E

The rest of this page provides additional detail for some of the configuration options and provides brief examples. Comprehensive lists of configuration options are at the [configuration reference page](../reference/configuration.md).

### Cache directory

By default, the SQLMesh cache is stored in a `.cache` directory within your project folder. You can customize the cache location using the `cache_dir` configuration option:

=== "YAML"

```yaml linenums="1"
# Relative path to project directory
cache_dir: my_custom_cache

# Absolute path
cache_dir: /tmp/sqlmesh_cache

```

=== "Python"

```python linenums="1"
from sqlmesh.core.config import Config, ModelDefaultsConfig

config = Config(
model_defaults=ModelDefaultsConfig(dialect="duckdb"),
cache_dir="/tmp/sqlmesh_cache",
)
```

The cache directory is automatically created if it doesn't exist. You can clear the cache using the `sqlmesh clean` command.

### Table/view storage locations

SQLMesh creates schemas, physical tables, and views in the data warehouse/engine. Learn more about why and how SQLMesh creates schema in the ["Why does SQLMesh create schemas?" FAQ](../faq/faq.md#schema-question).
Expand Down
1 change: 1 addition & 0 deletions docs/reference/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Configuration options for SQLMesh project directories.
| ------------------ | ------------------------------------------------------------------------------------------------------------------ | :----------: | :------: |
| `ignore_patterns` | Files that match glob patterns specified in this list are ignored when scanning the project folder (Default: `[]`) | list[string] | N |
| `project` | The project name of this config. Used for [multi-repo setups](../guides/multi_repo.md). | string | N |
| `cache_dir` | The directory to store the SQLMesh cache. Can be an absolute path or relative to the project directory. (Default: `.cache`) | string | N |

### Environments

Expand Down
2 changes: 2 additions & 0 deletions sqlmesh/core/config/root.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class Config(BaseConfig):
disable_anonymized_analytics: Whether to disable the anonymized analytics collection.
before_all: SQL statements or macros to be executed at the start of the `sqlmesh plan` and `sqlmesh run` commands.
after_all: SQL statements or macros to be executed at the end of the `sqlmesh plan` and `sqlmesh run` commands.
cache_dir: The directory to store the SQLMesh cache. Defaults to .cache in the project folder.
"""

gateways: GatewayDict = {"": GatewayConfig()}
Expand Down Expand Up @@ -165,6 +166,7 @@ class Config(BaseConfig):
after_all: t.Optional[t.List[str]] = None
linter: LinterConfig = LinterConfig()
janitor: JanitorConfig = JanitorConfig()
cache_dir: t.Optional[str] = None

_FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = {
"gateways": UpdateStrategy.NESTED_UPDATE,
Expand Down
2 changes: 1 addition & 1 deletion sqlmesh/core/config/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def create_state_sync(self, context: GenericContext) -> StateSync:

schema = context.config.get_state_schema(context.gateway)
return EngineAdapterStateSync(
engine_adapter, schema=schema, context_path=context.path, console=context.console
engine_adapter, schema=schema, cache_dir=context.cache_dir, console=context.console
)

def state_sync_fingerprint(self, context: GenericContext) -> str:
Expand Down
27 changes: 25 additions & 2 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,11 @@ def upsert_model(self, model: t.Union[str, Model], **kwargs: t.Any) -> Model:
}
)

update_model_schemas(self.dag, models=self._models, context_path=self.path)
update_model_schemas(
self.dag,
models=self._models,
cache_dir=self.cache_dir,
)

if model.dialect:
self._all_dialects.add(model.dialect)
Expand Down Expand Up @@ -640,7 +644,11 @@ def load(self, update_schemas: bool = True) -> GenericContext[C]:
self._models.update({fqn: model.copy(update={"mapping_schema": {}})})
continue

update_model_schemas(self.dag, models=self._models, context_path=self.path)
update_model_schemas(
self.dag,
models=self._models,
cache_dir=self.cache_dir,
)

models = self.models.values()
for model in models:
Expand Down Expand Up @@ -2439,6 +2447,9 @@ def clear_caches(self) -> None:
cache_path = path / c.CACHE
if cache_path.exists():
rmtree(cache_path)
if self.cache_dir.exists():
rmtree(self.cache_dir)

if isinstance(self.state_sync, CachingStateSync):
self.state_sync.clear_cache()

Expand Down Expand Up @@ -2538,6 +2549,17 @@ def _model_tables(self) -> t.Dict[str, str]:
for fqn, snapshot in self.snapshots.items()
}

@cached_property
def cache_dir(self) -> Path:
if self.config.cache_dir:
cache_path = Path(self.config.cache_dir)
if cache_path.is_absolute():
return cache_path
return self.path / cache_path

# Default to .cache directory in the project path
return self.path / c.CACHE

@cached_property
def engine_adapters(self) -> t.Dict[str, EngineAdapter]:
"""Returns all the engine adapters for the gateways defined in the configuration."""
Expand Down Expand Up @@ -2735,6 +2757,7 @@ def _new_selector(
dag=dag,
default_catalog=self.default_catalog,
dialect=self.default_dialect,
cache_dir=self.cache_dir,
)

def _register_notification_targets(self) -> None:
Expand Down
2 changes: 1 addition & 1 deletion sqlmesh/core/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ class _Cache(CacheBase):
def __init__(self, loader: SqlMeshLoader, config_path: Path):
self._loader = loader
self.config_path = config_path
self._model_cache = ModelCache(self.config_path / c.CACHE)
self._model_cache = ModelCache(self._loader.context.cache_dir)

def get_or_load_models(
self, target_path: Path, loader: t.Callable[[], t.List[Model]]
Expand Down
5 changes: 2 additions & 3 deletions sqlmesh/core/model/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from sqlglot.errors import SchemaError
from sqlglot.schema import MappingSchema

from sqlmesh.core import constants as c
from sqlmesh.core.model.cache import (
load_optimized_query_and_mapping,
optimized_query_cache_pool,
Expand All @@ -23,10 +22,10 @@
def update_model_schemas(
dag: DAG[str],
models: UniqueKeyDict[str, Model],
context_path: Path,
cache_dir: Path,
) -> None:
schema = MappingSchema(normalize=False)
optimized_query_cache: OptimizedQueryCache = OptimizedQueryCache(context_path / c.CACHE)
optimized_query_cache: OptimizedQueryCache = OptimizedQueryCache(cache_dir)

_update_model_schemas(dag, models, schema, optimized_query_cache)

Expand Down
5 changes: 4 additions & 1 deletion sqlmesh/core/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from sqlglot.dialects.dialect import Dialect, DialectType
from sqlglot.helper import seq_get

from sqlmesh.core import constants as c
from sqlmesh.core.dialect import normalize_model_name
from sqlmesh.core.environment import Environment
from sqlmesh.core.model import update_model_schemas
Expand All @@ -34,10 +35,12 @@ def __init__(
dag: t.Optional[DAG[str]] = None,
default_catalog: t.Optional[str] = None,
dialect: t.Optional[str] = None,
cache_dir: t.Optional[Path] = None,
):
self._state_reader = state_reader
self._models = models
self._context_path = context_path
self._cache_dir = cache_dir if cache_dir else context_path / c.CACHE
self._default_catalog = default_catalog
self._dialect = dialect
self._git_client = GitClient(context_path)
Expand Down Expand Up @@ -157,7 +160,7 @@ def get_model(fqn: str) -> t.Optional[Model]:
models[model.fqn] = model

if needs_update:
update_model_schemas(dag, models=models, context_path=self._context_path)
update_model_schemas(dag, models=models, cache_dir=self._cache_dir)

return models

Expand Down
8 changes: 3 additions & 5 deletions sqlmesh/core/state_sync/db/facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,22 +79,20 @@ class EngineAdapterStateSync(StateSync):
engine_adapter: The EngineAdapter to use to store and fetch snapshots.
schema: The schema to store state metadata in. If None or empty string then no schema is defined
console: The console to log information to.
context_path: The context path, used for caching snapshot models.
cache_dir: The cache path, used for caching snapshot models.
"""

def __init__(
self,
engine_adapter: EngineAdapter,
schema: t.Optional[str],
console: t.Optional[Console] = None,
context_path: Path = Path(),
cache_dir: Path = Path(),
):
self.plan_dags_table = exp.table_("_plan_dags", db=schema)
self.interval_state = IntervalState(engine_adapter, schema=schema)
self.environment_state = EnvironmentState(engine_adapter, schema=schema)
self.snapshot_state = SnapshotState(
engine_adapter, schema=schema, context_path=context_path
)
self.snapshot_state = SnapshotState(engine_adapter, schema=schema, cache_dir=cache_dir)
self.version_state = VersionState(engine_adapter, schema=schema)
self.migrator = StateMigrator(
engine_adapter,
Expand Down
5 changes: 2 additions & 3 deletions sqlmesh/core/state_sync/db/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from sqlglot import exp
from pydantic import Field

from sqlmesh.core import constants as c
from sqlmesh.core.engine_adapter import EngineAdapter
from sqlmesh.core.state_sync.db.utils import (
snapshot_name_version_filter,
Expand Down Expand Up @@ -53,7 +52,7 @@ def __init__(
self,
engine_adapter: EngineAdapter,
schema: t.Optional[str] = None,
context_path: Path = Path(),
cache_dir: Path = Path(),
):
self.engine_adapter = engine_adapter
self.snapshots_table = exp.table_("_snapshots", db=schema)
Expand All @@ -79,7 +78,7 @@ def __init__(
"next_auto_restatement_ts": exp.DataType.build("bigint"),
}

self._snapshot_cache = SnapshotCache(context_path / c.CACHE)
self._snapshot_cache = SnapshotCache(cache_dir)

def push_snapshots(self, snapshots: t.Iterable[Snapshot], overwrite: bool = False) -> None:
"""Pushes snapshots to the state store.
Expand Down
5 changes: 2 additions & 3 deletions sqlmesh/dbt/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import typing as t
import sqlmesh.core.dialect as d
from pathlib import Path
from sqlmesh.core import constants as c
from sqlmesh.core.config import (
Config,
ConnectionConfig,
Expand Down Expand Up @@ -330,8 +329,8 @@ def __init__(
self._yaml_max_mtimes = yaml_max_mtimes

target = t.cast(TargetConfig, project.context.target)
cache_path = loader.config_path / c.CACHE / target.name
self._model_cache = ModelCache(cache_path)
cache_dir = loader.context.cache_dir / target.name
self._model_cache = ModelCache(cache_dir)

def get_or_load_models(
self, target_path: Path, loader: t.Callable[[], t.List[Model]]
Expand Down
11 changes: 10 additions & 1 deletion sqlmesh/dbt/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def __init__(
profile_name: str,
target: TargetConfig,
variable_overrides: t.Optional[t.Dict[str, t.Any]] = None,
cache_dir: t.Optional[str] = None,
):
self.project_path = project_path
self.profiles_path = profiles_path
Expand All @@ -99,8 +100,16 @@ def __init__(
self._tests_by_owner: t.Dict[str, t.List[TestConfig]] = defaultdict(list)
self._disabled_refs: t.Optional[t.Set[str]] = None
self._disabled_sources: t.Optional[t.Set[str]] = None

if cache_dir is not None:
cache_path = Path(cache_dir)
if not cache_path.is_absolute():
cache_path = self.project_path / cache_path
else:
cache_path = self.project_path / c.CACHE

self._call_cache: FileCache[t.Dict[str, t.List[CallNames]]] = FileCache(
self.project_path / c.CACHE, "jinja_calls"
cache_path, "jinja_calls"
)

self._on_run_start_per_package: t.Dict[str, HookConfigs] = defaultdict(dict)
Expand Down
1 change: 1 addition & 0 deletions sqlmesh/dbt/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def load(cls, context: DbtContext, variables: t.Optional[t.Dict[str, t.Any]] = N
profile_name,
target=profile.target,
variable_overrides=variable_overrides,
cache_dir=context.sqlmesh_config.cache_dir,
)

extra_fields = profile.target.extra
Expand Down
2 changes: 1 addition & 1 deletion tests/core/state_sync/test_export_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def state_sync(tmp_path: Path, example_project_config: Config) -> StateSync:
return EngineAdapterStateSync(
engine_adapter=example_project_config.get_state_connection("main").create_engine_adapter(), # type: ignore
schema=c.SQLMESH,
context_path=tmp_path,
cache_dir=tmp_path / c.CACHE,
)


Expand Down
16 changes: 12 additions & 4 deletions tests/core/state_sync/test_state_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@
@pytest.fixture
def state_sync(duck_conn, tmp_path):
state_sync = EngineAdapterStateSync(
create_engine_adapter(lambda: duck_conn, "duckdb"), schema=c.SQLMESH, context_path=tmp_path
create_engine_adapter(lambda: duck_conn, "duckdb"),
schema=c.SQLMESH,
cache_dir=tmp_path / c.CACHE,
)
state_sync.migrate(default_catalog=None)
return state_sync
Expand Down Expand Up @@ -2082,7 +2084,9 @@ def test_version_schema(state_sync: EngineAdapterStateSync, tmp_path) -> None:

# Start with a clean slate.
state_sync = EngineAdapterStateSync(
create_engine_adapter(duckdb.connect, "duckdb"), schema=c.SQLMESH, context_path=tmp_path
create_engine_adapter(duckdb.connect, "duckdb"),
schema=c.SQLMESH,
cache_dir=tmp_path / c.CACHE,
)

with pytest.raises(
Expand Down Expand Up @@ -2203,7 +2207,9 @@ def test_migrate(state_sync: EngineAdapterStateSync, mocker: MockerFixture, tmp_

# Start with a clean slate.
state_sync = EngineAdapterStateSync(
create_engine_adapter(duckdb.connect, "duckdb"), schema=c.SQLMESH, context_path=tmp_path
create_engine_adapter(duckdb.connect, "duckdb"),
schema=c.SQLMESH,
cache_dir=tmp_path / c.CACHE,
)

state_sync.migrate(default_catalog=None)
Expand Down Expand Up @@ -2254,7 +2260,9 @@ def test_rollback(state_sync: EngineAdapterStateSync, mocker: MockerFixture) ->

def test_first_migration_failure(duck_conn, mocker: MockerFixture, tmp_path) -> None:
state_sync = EngineAdapterStateSync(
create_engine_adapter(lambda: duck_conn, "duckdb"), schema=c.SQLMESH, context_path=tmp_path
create_engine_adapter(lambda: duck_conn, "duckdb"),
schema=c.SQLMESH,
cache_dir=tmp_path / c.CACHE,
)
mocker.patch.object(state_sync.migrator, "_migrate_rows", side_effect=Exception("mocked error"))
with pytest.raises(
Expand Down
Loading