Skip to content

Commit eebce74

Browse files
feat(backend): route use_sea=True through ADBC-Rust kernel via PyO3
Adds a new backend, `AdbcDatabricksClient`, that delegates query execution to the `databricks_adbc_pyo3` extension module (PyO3 bindings over the Databricks ADBC Rust kernel). When `use_sea=True` is passed to `sql.connect`, requests now flow through the Rust kernel instead of the existing Python-SEA backend. This is the Python-side companion to the satellite PyO3 binding being prototyped in adbc-drivers/databricks#423. **Draft** while that binding is not yet on PyPI — `import databricks_adbc_pyo3` will fail unless the binding is installed locally via `maturin develop`. What's wired through the public API: - sql.connect(..., use_sea=True) → Rust kernel - cursor.execute(...) → SEA + CloudFetch - cursor.fetchone() / fetchmany(n) / fetchall() → Row tuples - cursor.fetchall_arrow() / fetchmany_arrow(n) → pyarrow.Table - cursor.description → PEP-249 7-tuples - iteration (`for row in cursor`), context mgrs What is NOT yet wired (raises NotImplementedError): - Parameterized queries (`parameters=[...]`) - Async execution (`async_op=True`) - Metadata methods (catalogs, schemas, tables, columns) Auth: PAT only for now; OAuth M2M / U2M / Azure SP / external credential providers are not yet plumbed through the Rust binding. Code layout: src/databricks/sql/backend/adbc/ __init__.py — re-exports AdbcDatabricksClient client.py — DatabricksClient impl, delegates to PyO3 result_set.py — ResultSet impl over the streaming PyO3 ResultSet, with batch buffering for fetchone / fetchmany. The old `backend/sea/` tree is left in place and unreachable from sql.connect; deletion is a separate cleanup once this backend reaches parity.
1 parent cbd6a88 commit eebce74

4 files changed

Lines changed: 458 additions & 9 deletions

File tree

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
"""ADBC-Rust-kernel-backed backend for databricks-sql-python (POC).
2+
3+
Wraps the PyO3 binding `databricks_adbc_pyo3` and adapts it to the
4+
`DatabricksClient` / `ResultSet` interfaces used by the rest of the connector.
5+
"""
6+
7+
from databricks.sql.backend.adbc.client import AdbcDatabricksClient
8+
9+
__all__ = ["AdbcDatabricksClient"]
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
"""DatabricksClient backed by the Rust ADBC kernel via PyO3 (POC).
2+
3+
Implements the connector's `DatabricksClient` interface by delegating to the
4+
`databricks_adbc_pyo3` extension module, which loads the Rust kernel
5+
(`databricks-adbc`) in-process. PAT-only for now; metadata and async
6+
operations raise NotImplementedError.
7+
"""
8+
9+
from __future__ import annotations
10+
11+
import logging
12+
import uuid
13+
from typing import Any, Dict, List, Optional, TYPE_CHECKING, Union
14+
15+
from databricks.sql.backend.databricks_client import DatabricksClient
16+
from databricks.sql.backend.types import (
17+
BackendType,
18+
CommandId,
19+
CommandState,
20+
SessionId,
21+
)
22+
from databricks.sql.backend.adbc.result_set import AdbcResultSet
23+
from databricks.sql.exc import DatabaseError, OperationalError, ProgrammingError
24+
from databricks.sql.thrift_api.TCLIService import ttypes
25+
26+
if TYPE_CHECKING:
27+
from databricks.sql.client import Cursor
28+
from databricks.sql.result_set import ResultSet
29+
30+
logger = logging.getLogger(__name__)
31+
32+
try:
33+
import databricks_adbc_pyo3 as _rust_kernel
34+
except ImportError as exc: # pragma: no cover - import-time error surfaces clearly
35+
raise ImportError(
36+
"use_sea=True requires the databricks_adbc_pyo3 extension. Install it from "
37+
"the databricks-adbc/rust-pyo3 directory with `maturin develop --release` "
38+
"in your venv."
39+
) from exc
40+
41+
42+
class AdbcDatabricksClient(DatabricksClient):
43+
"""DatabricksClient that routes execution through the Rust ADBC kernel.
44+
45+
Construction does not open a Rust connection — that happens in
46+
`open_session` so the same Session lifecycle that today gates Thrift's
47+
`TOpenSession` gates the Rust kernel's connection setup too.
48+
"""
49+
50+
def __init__(
51+
self,
52+
server_hostname: str,
53+
http_path: str,
54+
access_token: Optional[str] = None,
55+
catalog: Optional[str] = None,
56+
schema: Optional[str] = None,
57+
**kwargs,
58+
):
59+
if not access_token:
60+
raise ProgrammingError(
61+
"AdbcDatabricksClient (use_sea=True) currently supports only PAT auth. "
62+
"Pass access_token=<dapi...>."
63+
)
64+
# Auth provider is built upstream but the Rust kernel re-does PAT auth itself,
65+
# so we just need the raw token here.
66+
self._server_hostname = server_hostname
67+
self._http_path = http_path
68+
self._access_token = access_token
69+
self._initial_catalog = catalog
70+
self._initial_schema = schema
71+
72+
# Per-session state. We support a single open session at a time; opening
73+
# a second one will raise. Matches the current Session lifecycle.
74+
self._connection: Optional[_rust_kernel.Connection] = None
75+
self._session_id: Optional[SessionId] = None
76+
77+
# ----- session lifecycle -----
78+
79+
def open_session(
80+
self,
81+
session_configuration: Optional[Dict[str, Any]],
82+
catalog: Optional[str],
83+
schema: Optional[str],
84+
) -> SessionId:
85+
if self._connection is not None:
86+
raise OperationalError("AdbcDatabricksClient already has an open session.")
87+
if session_configuration:
88+
logger.warning(
89+
"AdbcDatabricksClient ignores session_configuration in POC: %s",
90+
list(session_configuration.keys()),
91+
)
92+
try:
93+
self._connection = _rust_kernel.Connection(
94+
self._server_hostname,
95+
self._http_path,
96+
self._access_token,
97+
catalog=catalog or self._initial_catalog,
98+
schema=schema or self._initial_schema,
99+
)
100+
except RuntimeError as exc:
101+
raise OperationalError(f"Failed to open Rust ADBC session: {exc}") from exc
102+
103+
# Mint a synthetic SEA-style session id; the kernel manages real session
104+
# lifecycle internally and does not surface its session GUID today.
105+
self._session_id = SessionId.from_sea_session_id(str(uuid.uuid4()))
106+
logger.info("Opened ADBC-Rust session %s", self._session_id)
107+
return self._session_id
108+
109+
def close_session(self, session_id: SessionId) -> None:
110+
if self._connection is None:
111+
return
112+
# PyO3 Connection has no explicit close in the POC — drop the reference
113+
# and let Drop release the Rust-side resources.
114+
self._connection = None
115+
self._session_id = None
116+
117+
# ----- query execution -----
118+
119+
def execute_command(
120+
self,
121+
operation: str,
122+
session_id: SessionId,
123+
max_rows: int,
124+
max_bytes: int,
125+
lz4_compression: bool,
126+
cursor: "Cursor",
127+
use_cloud_fetch: bool,
128+
parameters: List[ttypes.TSparkParameter],
129+
async_op: bool,
130+
enforce_embedded_schema_correctness: bool,
131+
row_limit: Optional[int] = None,
132+
query_tags: Optional[Dict[str, Optional[str]]] = None,
133+
) -> Union["ResultSet", None]:
134+
if self._connection is None:
135+
raise OperationalError("Cannot execute_command on closed session.")
136+
if async_op:
137+
raise NotImplementedError(
138+
"async_op is not supported by the Rust ADBC backend (POC)."
139+
)
140+
if parameters:
141+
raise NotImplementedError(
142+
"Parameter binding is not supported by the Rust ADBC backend (POC)."
143+
)
144+
145+
try:
146+
rs = self._connection.execute(operation)
147+
except RuntimeError as exc:
148+
raise DatabaseError(f"Rust ADBC execution failed: {exc}") from exc
149+
150+
# The kernel does not surface its statement_id today; mint a synthetic one.
151+
command_id = CommandId.from_sea_statement_id(str(uuid.uuid4()))
152+
cursor.active_command_id = command_id
153+
154+
return AdbcResultSet(
155+
connection=cursor.connection,
156+
backend=self,
157+
rust_result_set=rs,
158+
command_id=command_id,
159+
arraysize=cursor.arraysize,
160+
buffer_size_bytes=cursor.buffer_size_bytes,
161+
)
162+
163+
def cancel_command(self, command_id: CommandId) -> None:
164+
# POC: execute_command is fully synchronous and the result is materialized
165+
# before it returns, so there is nothing to cancel after the fact.
166+
logger.debug("cancel_command is a no-op in the Rust ADBC POC backend")
167+
168+
def close_command(self, command_id: CommandId) -> None:
169+
# Result set is already drained on the Rust side.
170+
logger.debug("close_command is a no-op in the Rust ADBC POC backend")
171+
172+
def get_query_state(self, command_id: CommandId) -> CommandState:
173+
# All commands run synchronously and reach SUCCEEDED before returning.
174+
return CommandState.SUCCEEDED
175+
176+
def get_execution_result(
177+
self,
178+
command_id: CommandId,
179+
cursor: "Cursor",
180+
) -> "ResultSet":
181+
raise NotImplementedError(
182+
"get_execution_result requires async execution (not supported in POC)."
183+
)
184+
185+
# ----- metadata (not yet wired) -----
186+
187+
def get_catalogs(self, *args, **kwargs):
188+
raise NotImplementedError("get_catalogs is not supported by the Rust ADBC backend (POC).")
189+
190+
def get_schemas(self, *args, **kwargs):
191+
raise NotImplementedError("get_schemas is not supported by the Rust ADBC backend (POC).")
192+
193+
def get_tables(self, *args, **kwargs):
194+
raise NotImplementedError("get_tables is not supported by the Rust ADBC backend (POC).")
195+
196+
def get_columns(self, *args, **kwargs):
197+
raise NotImplementedError("get_columns is not supported by the Rust ADBC backend (POC).")
198+
199+
@property
200+
def max_download_threads(self) -> int:
201+
# The kernel manages its own CloudFetch parallelism; this property is
202+
# only consulted by Thrift code paths that don't run for use_sea=True.
203+
return 10

0 commit comments

Comments
 (0)