Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .circleci/continue_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,10 @@ workflows:
- bigquery
- clickhouse-cloud
- athena
filters:
branches:
only:
- main
# filters:
# branches:
# only:
# - main
Comment thread
VaggelisD marked this conversation as resolved.
Outdated
- trigger_private_tests:
requires:
- style_and_cicd_tests
Expand Down
4 changes: 4 additions & 0 deletions sqlmesh/core/engine_adapter/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ class AthenaEngineAdapter(PandasNativeFetchDFSupportMixin, RowDiffMixin):
COMMENT_CREATION_VIEW = CommentCreationView.UNSUPPORTED
SCHEMA_DIFFER = TrinoEngineAdapter.SCHEMA_DIFFER
MAX_TIMESTAMP_PRECISION = 3 # copied from Trino
# Athena does not deal with comments well, e.g:
# >>> self._execute('/* test */ DESCRIBE foo')
# pyathena.error.OperationalError: FAILED: ParseException line 1:0 cannot recognize input near '/' '*' 'test'
ATTACH_CORRELATION_ID = False

def __init__(
self, *args: t.Any, s3_warehouse_location: t.Optional[str] = None, **kwargs: t.Any
Expand Down
9 changes: 7 additions & 2 deletions sqlmesh/core/engine_adapter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class EngineAdapter:
DEFAULT_CATALOG_TYPE = DIALECT
QUOTE_IDENTIFIERS_IN_VIEWS = True
MAX_IDENTIFIER_LENGTH: t.Optional[int] = None
ATTACH_CORRELATION_ID = True

def __init__(
self,
Expand Down Expand Up @@ -2219,8 +2220,7 @@ def execute(
else:
sql = t.cast(str, e)

if self.correlation_id:
sql = f"/* {self.correlation_id} */ {sql}"
sql = self._attach_correlation_id(sql)

self._log_sql(
sql,
Expand All @@ -2229,6 +2229,11 @@ def execute(
)
self._execute(sql, **kwargs)

def _attach_correlation_id(self, sql: str) -> str:
if self.ATTACH_CORRELATION_ID and self.correlation_id:
return f"/* {self.correlation_id} */ {sql}"
return sql

def _log_sql(
self,
sql: str,
Expand Down