-
Notifications
You must be signed in to change notification settings - Fork 380
Expand file tree
/
Copy pathcustom_kind.py
More file actions
32 lines (23 loc) · 911 Bytes
/
custom_kind.py
File metadata and controls
32 lines (23 loc) · 911 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from __future__ import annotations
import typing as t
from sqlmesh import CustomMaterialization, CustomKind, Model
from sqlmesh.utils.pydantic import validate_string
if t.TYPE_CHECKING:
from sqlmesh import QueryOrDF
class ExtendedCustomKind(CustomKind):
@property
def custom_property(self) -> str:
return validate_string(self.materialization_properties.get("custom_property"))
class CustomFullWithCustomKindMaterialization(CustomMaterialization[ExtendedCustomKind]):
NAME = "custom_full_with_custom_kind"
def insert(
self,
table_name: str,
query_or_df: QueryOrDF,
model: Model,
is_first_insert: bool,
render_kwargs: t.Dict[str, t.Any],
**kwargs: t.Any,
) -> None:
assert type(model.kind).__name__ == "ExtendedCustomKind"
self._replace_query_for_model(model, table_name, query_or_df, render_kwargs)