-
Notifications
You must be signed in to change notification settings - Fork 143
Expand file tree
/
Copy pathtest_transactions.py
More file actions
772 lines (633 loc) · 30.5 KB
/
test_transactions.py
File metadata and controls
772 lines (633 loc) · 30.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
"""
End-to-end integration tests for Multi-Statement Transaction (MST) APIs.
Tests driver behavior for MST across:
- Basic correctness (commit/rollback/isolation/multi-table)
- API-specific (autocommit, isolation level, error handling)
- Metadata RPCs inside transactions (non-transactional freshness)
- SQL statements blocked by MSTCheckRule (SHOW, DESCRIBE, information_schema)
- Execute variants (executemany)
Parallelisation:
- Each test uses its own unique table (derived from test name) to allow
parallel execution with pytest-xdist.
- Tests requiring multiple concurrent connections to the same table are
tagged with xdist_group so the concurrent connections within a single
test don't conflict with other tests on different workers.
Requirements:
- DBSQL warehouse that supports Multi-Statement Transactions (MST)
- Env vars: DATABRICKS_SERVER_HOSTNAME, DATABRICKS_HTTP_PATH,
DATABRICKS_TOKEN, DATABRICKS_CATALOG, DATABRICKS_SCHEMA
"""
import logging
import os
import re
import uuid
import pytest
import databricks.sql as sql
logger = logging.getLogger(__name__)
def _unique_table_name(request):
"""Derive a unique Delta table name from the test node id."""
node_id = request.node.name
sanitized = re.sub(r"[^a-z0-9_]", "_", node_id.lower())
return f"mst_pysql_{sanitized}"[:80]
def _unique_table_name_raw(suffix):
"""Non-fixture unique table name helper for extra tables within a test."""
return f"mst_pysql_{suffix}_{uuid.uuid4().hex[:8]}"
@pytest.fixture
def mst_conn_params(connection_details):
"""Connection parameters with MST enabled."""
return {
"server_hostname": connection_details["host"],
"http_path": connection_details["http_path"],
"access_token": connection_details.get("access_token"),
"ignore_transactions": False,
}
@pytest.fixture
def mst_catalog(connection_details):
return connection_details.get("catalog") or os.getenv("DATABRICKS_CATALOG") or "main"
@pytest.fixture
def mst_schema(connection_details):
return connection_details.get("schema") or os.getenv("DATABRICKS_SCHEMA") or "default"
@pytest.fixture
def mst_table(request, mst_conn_params, mst_catalog, mst_schema):
"""Create a fresh Delta table for the test and drop it afterwards.
Yields (fq_table_name, table_name). The table is unique per test so tests
can run in parallel without stepping on each other.
"""
table_name = _unique_table_name(request)
fq_table = f"{mst_catalog}.{mst_schema}.{table_name}"
with sql.connect(**mst_conn_params) as conn:
with conn.cursor() as cursor:
cursor.execute(f"DROP TABLE IF EXISTS {fq_table}")
cursor.execute(
f"CREATE TABLE {fq_table} (id INT, value STRING) USING DELTA "
f"TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported')"
)
yield fq_table, table_name
try:
with sql.connect(**mst_conn_params) as conn:
with conn.cursor() as cursor:
cursor.execute(f"DROP TABLE IF EXISTS {fq_table}")
except Exception as e:
logger.warning(f"Failed to drop {fq_table}: {e}")
def _get_row_count(mst_conn_params, fq_table):
"""Count rows from a fresh connection (avoids in-txn caching)."""
with sql.connect(**mst_conn_params) as conn:
with conn.cursor() as cursor:
cursor.execute(f"SELECT COUNT(*) FROM {fq_table}")
return cursor.fetchone()[0]
def _get_ids(mst_conn_params, fq_table):
"""Return the set of ids from a fresh connection."""
with sql.connect(**mst_conn_params) as conn:
with conn.cursor() as cursor:
cursor.execute(f"SELECT id FROM {fq_table}")
return {row[0] for row in cursor.fetchall()}
# ==================== A. BASIC CORRECTNESS ====================
class TestMstCorrectness:
"""Core MST correctness: commit, rollback, isolation, multi-table."""
def test_commit_single_insert(self, mst_conn_params, mst_table):
fq_table, _ = mst_table
with sql.connect(**mst_conn_params) as conn:
conn.autocommit = False
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'committed')")
conn.commit()
assert _get_row_count(mst_conn_params, fq_table) == 1
def test_commit_multiple_inserts(self, mst_conn_params, mst_table):
fq_table, _ = mst_table
with sql.connect(**mst_conn_params) as conn:
conn.autocommit = False
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'a')")
cursor.execute(f"INSERT INTO {fq_table} VALUES (2, 'b')")
cursor.execute(f"INSERT INTO {fq_table} VALUES (3, 'c')")
conn.commit()
assert _get_row_count(mst_conn_params, fq_table) == 3
def test_rollback_single_insert(self, mst_conn_params, mst_table):
fq_table, _ = mst_table
with sql.connect(**mst_conn_params) as conn:
conn.autocommit = False
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'rolled_back')")
conn.rollback()
assert _get_row_count(mst_conn_params, fq_table) == 0
def test_sequential_transactions(self, mst_conn_params, mst_table):
fq_table, _ = mst_table
with sql.connect(**mst_conn_params) as conn:
conn.autocommit = False
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'txn1')")
conn.commit()
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (2, 'txn2')")
conn.rollback()
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (3, 'txn3')")
conn.commit()
assert _get_row_count(mst_conn_params, fq_table) == 2
def test_auto_start_after_commit(self, mst_conn_params, mst_table):
fq_table, _ = mst_table
with sql.connect(**mst_conn_params) as conn:
conn.autocommit = False
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'txn1')")
conn.commit()
# Second INSERT auto-starts a new transaction
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (2, 'txn2')")
conn.rollback()
assert _get_ids(mst_conn_params, fq_table) == {1}
def test_auto_start_after_rollback(self, mst_conn_params, mst_table):
fq_table, _ = mst_table
with sql.connect(**mst_conn_params) as conn:
conn.autocommit = False
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'txn1')")
conn.rollback()
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (2, 'txn2')")
conn.commit()
assert _get_ids(mst_conn_params, fq_table) == {2}
def test_update_in_transaction(self, mst_conn_params, mst_table):
fq_table, _ = mst_table
with sql.connect(**mst_conn_params) as conn:
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'original')")
conn.autocommit = False
with conn.cursor() as cursor:
cursor.execute(f"UPDATE {fq_table} SET value = 'updated' WHERE id = 1")
conn.commit()
with sql.connect(**mst_conn_params) as conn:
with conn.cursor() as cursor:
cursor.execute(f"SELECT value FROM {fq_table} WHERE id = 1")
assert cursor.fetchone()[0] == "updated"
def test_delete_in_transaction(self, mst_conn_params, mst_table):
fq_table, _ = mst_table
with sql.connect(**mst_conn_params) as conn:
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'a')")
cursor.execute(f"INSERT INTO {fq_table} VALUES (2, 'b')")
conn.autocommit = False
with conn.cursor() as cursor:
cursor.execute(f"DELETE FROM {fq_table} WHERE id = 1")
conn.commit()
assert _get_row_count(mst_conn_params, fq_table) == 1
def test_multi_table_commit(self, mst_conn_params, mst_table, mst_catalog, mst_schema):
fq_table1, _ = mst_table
fq_table2 = f"{mst_catalog}.{mst_schema}.{_unique_table_name_raw('multi_commit_t2')}"
with sql.connect(**mst_conn_params) as conn:
with conn.cursor() as cursor:
cursor.execute(f"DROP TABLE IF EXISTS {fq_table2}")
cursor.execute(
f"CREATE TABLE {fq_table2} (id INT, value STRING) USING DELTA "
f"TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported')"
)
try:
conn.autocommit = False
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table1} VALUES (1, 't1')")
cursor.execute(f"INSERT INTO {fq_table2} VALUES (1, 't2')")
conn.commit()
assert _get_row_count(mst_conn_params, fq_table1) == 1
assert _get_row_count(mst_conn_params, fq_table2) == 1
finally:
conn.autocommit = True
with conn.cursor() as cursor:
cursor.execute(f"DROP TABLE IF EXISTS {fq_table2}")
def test_multi_table_rollback(self, mst_conn_params, mst_table, mst_catalog, mst_schema):
fq_table1, _ = mst_table
fq_table2 = f"{mst_catalog}.{mst_schema}.{_unique_table_name_raw('multi_rb_t2')}"
with sql.connect(**mst_conn_params) as conn:
with conn.cursor() as cursor:
cursor.execute(f"DROP TABLE IF EXISTS {fq_table2}")
cursor.execute(
f"CREATE TABLE {fq_table2} (id INT, value STRING) USING DELTA "
f"TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported')"
)
try:
conn.autocommit = False
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table1} VALUES (1, 't1')")
cursor.execute(f"INSERT INTO {fq_table2} VALUES (1, 't2')")
conn.rollback()
assert _get_row_count(mst_conn_params, fq_table1) == 0
assert _get_row_count(mst_conn_params, fq_table2) == 0
finally:
conn.autocommit = True
with conn.cursor() as cursor:
cursor.execute(f"DROP TABLE IF EXISTS {fq_table2}")
def test_multi_table_atomicity(self, mst_conn_params, mst_table):
fq_table, _ = mst_table
with sql.connect(**mst_conn_params) as conn:
conn.autocommit = False
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'should_rollback')")
with pytest.raises(Exception):
cursor.execute(
"INSERT INTO nonexistent_table_xyz_xyz VALUES (1, 'fail')"
)
conn.rollback()
assert _get_row_count(mst_conn_params, fq_table) == 0
@pytest.mark.xdist_group(name="mst_repeatable_reads")
def test_repeatable_reads(self, mst_conn_params, mst_table):
fq_table, _ = mst_table
with sql.connect(**mst_conn_params) as conn:
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'initial')")
conn.autocommit = False
with conn.cursor() as cursor:
cursor.execute(f"SELECT value FROM {fq_table} WHERE id = 1")
first_read = cursor.fetchone()[0]
# External connection modifies data
with sql.connect(**mst_conn_params) as ext_conn:
with ext_conn.cursor() as ext_cursor:
ext_cursor.execute(
f"UPDATE {fq_table} SET value = 'modified' WHERE id = 1"
)
# Re-read in same txn — should see original value
with conn.cursor() as cursor:
cursor.execute(f"SELECT value FROM {fq_table} WHERE id = 1")
second_read = cursor.fetchone()[0]
assert first_read == second_read, "Repeatable read: value should not change"
conn.rollback()
@pytest.mark.xdist_group(name="mst_write_conflict")
def test_write_conflict_single_table(self, mst_conn_params, mst_table):
fq_table, _ = mst_table
with sql.connect(**mst_conn_params) as setup_conn:
with setup_conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'initial')")
conn1 = sql.connect(**mst_conn_params)
conn2 = sql.connect(**mst_conn_params)
try:
conn1.autocommit = False
conn2.autocommit = False
with conn1.cursor() as c1:
c1.execute(f"UPDATE {fq_table} SET value = 'conn1' WHERE id = 1")
with conn2.cursor() as c2:
c2.execute(f"UPDATE {fq_table} SET value = 'conn2' WHERE id = 1")
conn1.commit()
with pytest.raises(Exception):
conn2.commit()
finally:
try:
conn1.close()
except Exception:
pass
try:
conn2.close()
except Exception:
pass
def test_read_only_transaction(self, mst_conn_params, mst_table):
fq_table, _ = mst_table
with sql.connect(**mst_conn_params) as conn:
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'existing')")
conn.autocommit = False
with conn.cursor() as cursor:
cursor.execute(f"SELECT COUNT(*) FROM {fq_table}")
assert cursor.fetchone()[0] == 1
conn.commit()
assert _get_row_count(mst_conn_params, fq_table) == 1
def test_rollback_after_query_failure(self, mst_conn_params, mst_table):
fq_table, _ = mst_table
with sql.connect(**mst_conn_params) as conn:
conn.autocommit = False
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'before_error')")
with pytest.raises(Exception):
cursor.execute("SELECT * FROM nonexistent_xyz_xyz")
conn.rollback()
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (2, 'after_recovery')")
conn.commit()
assert _get_row_count(mst_conn_params, fq_table) == 1
def test_multiple_cursors_in_txn(self, mst_conn_params, mst_table):
fq_table, _ = mst_table
with sql.connect(**mst_conn_params) as conn:
conn.autocommit = False
with conn.cursor() as c1:
c1.execute(f"INSERT INTO {fq_table} VALUES (1, 'c1')")
with conn.cursor() as c2:
c2.execute(f"INSERT INTO {fq_table} VALUES (2, 'c2')")
with conn.cursor() as c3:
c3.execute(f"INSERT INTO {fq_table} VALUES (3, 'c3')")
conn.commit()
assert _get_row_count(mst_conn_params, fq_table) == 3
def test_parameterized_insert(self, mst_conn_params, mst_table):
fq_table, _ = mst_table
with sql.connect(**mst_conn_params) as conn:
conn.autocommit = False
with conn.cursor() as cursor:
cursor.execute(
f"INSERT INTO {fq_table} VALUES (%(id)s, %(value)s)",
{"id": 1, "value": "parameterized"},
)
conn.commit()
with sql.connect(**mst_conn_params) as conn:
with conn.cursor() as cursor:
cursor.execute(f"SELECT value FROM {fq_table} WHERE id = 1")
assert cursor.fetchone()[0] == "parameterized"
def test_empty_transaction_rollback(self, mst_conn_params, mst_table):
with sql.connect(**mst_conn_params) as conn:
conn.autocommit = False
# Rollback with no DML should not raise
conn.rollback()
def test_close_connection_implicit_rollback(self, mst_conn_params, mst_table):
fq_table, _ = mst_table
conn = sql.connect(**mst_conn_params)
conn.autocommit = False
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'pending')")
conn.close()
assert _get_row_count(mst_conn_params, fq_table) == 0
# ==================== B. API-SPECIFIC TESTS ====================
class TestMstApi:
"""DB-API-specific tests: autocommit, isolation, error handling."""
def test_default_autocommit_is_true(self, mst_conn_params):
with sql.connect(**mst_conn_params) as conn:
assert conn.autocommit is True
def test_set_autocommit_false(self, mst_conn_params):
with sql.connect(**mst_conn_params) as conn:
conn.autocommit = False
assert conn.autocommit is False
def test_commit_without_active_txn_throws(self, mst_conn_params):
with sql.connect(**mst_conn_params) as conn:
with pytest.raises(Exception, match=r"NO_ACTIVE_TRANSACTION"):
conn.commit()
def test_set_autocommit_during_active_txn_throws(
self, mst_conn_params, mst_table
):
fq_table, _ = mst_table
with sql.connect(**mst_conn_params) as conn:
conn.autocommit = False
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'active_txn')")
with pytest.raises(Exception):
conn.autocommit = True
conn.rollback()
def test_supported_isolation_level(self, mst_conn_params):
with sql.connect(**mst_conn_params) as conn:
conn.set_transaction_isolation("REPEATABLE_READ")
assert conn.get_transaction_isolation() == "REPEATABLE_READ"
def test_unsupported_isolation_level_rejected(self, mst_conn_params):
with sql.connect(**mst_conn_params) as conn:
for level in ["READ_UNCOMMITTED", "READ_COMMITTED", "SERIALIZABLE"]:
with pytest.raises(Exception):
conn.set_transaction_isolation(level)
# ==================== C. METADATA RPCs ====================
class TestMstMetadata:
"""Metadata RPCs inside active transactions.
Python uses Thrift RPCs for cursor.columns, cursor.tables, etc. These
RPCs bypass MST context and return non-transactional data — they see
concurrent DDL changes that the transaction shouldn't see.
"""
def test_cursor_columns_in_mst(
self, mst_conn_params, mst_table, mst_catalog, mst_schema
):
fq_table, table_name = mst_table
with sql.connect(**mst_conn_params) as conn:
conn.autocommit = False
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'test')")
cursor.columns(
catalog_name=mst_catalog, schema_name=mst_schema, table_name=table_name
)
columns = cursor.fetchall()
assert len(columns) > 0
conn.rollback()
def test_cursor_tables_in_mst(
self, mst_conn_params, mst_table, mst_catalog, mst_schema
):
fq_table, table_name = mst_table
with sql.connect(**mst_conn_params) as conn:
conn.autocommit = False
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'test')")
cursor.tables(
catalog_name=mst_catalog, schema_name=mst_schema, table_name=table_name
)
tables = cursor.fetchall()
assert len(tables) > 0
conn.rollback()
def test_cursor_schemas_in_mst(self, mst_conn_params, mst_table, mst_catalog):
fq_table, _ = mst_table
with sql.connect(**mst_conn_params) as conn:
conn.autocommit = False
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'test')")
cursor.schemas(catalog_name=mst_catalog)
schemas = cursor.fetchall()
assert len(schemas) > 0
conn.rollback()
def test_cursor_catalogs_in_mst(self, mst_conn_params, mst_table):
fq_table, _ = mst_table
with sql.connect(**mst_conn_params) as conn:
conn.autocommit = False
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'test')")
cursor.catalogs()
catalogs = cursor.fetchall()
assert len(catalogs) > 0
conn.rollback()
@pytest.mark.xdist_group(name="mst_freshness_columns")
def test_cursor_columns_non_transactional_after_concurrent_ddl(
self, mst_conn_params, mst_table, mst_catalog, mst_schema
):
"""Thrift cursor.columns() bypasses MST — sees concurrent ALTER TABLE."""
fq_table, table_name = mst_table
with sql.connect(**mst_conn_params) as conn:
conn.autocommit = False
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'test')")
cursor.columns(
catalog_name=mst_catalog, schema_name=mst_schema, table_name=table_name
)
before_cols = {row[3].lower() for row in cursor.fetchall()}
# External connection alters schema
with sql.connect(**mst_conn_params) as ext_conn:
with ext_conn.cursor() as ext_cursor:
ext_cursor.execute(
f"ALTER TABLE {fq_table} ADD COLUMN new_col STRING"
)
# Re-read columns in same txn — Thrift RPC bypasses txn isolation,
# so new_col IS visible (proves non-transactional behavior)
with conn.cursor() as cursor:
cursor.columns(
catalog_name=mst_catalog, schema_name=mst_schema, table_name=table_name
)
after_cols = {row[3].lower() for row in cursor.fetchall()}
assert "new_col" in after_cols, (
"Thrift cursor.columns() should see concurrent DDL "
"(non-transactional behavior)"
)
assert before_cols != after_cols
conn.rollback()
@pytest.mark.xdist_group(name="mst_freshness_tables")
def test_cursor_tables_non_transactional_after_concurrent_create(
self, mst_conn_params, mst_table, mst_catalog, mst_schema
):
"""Thrift cursor.tables() bypasses MST — sees concurrent CREATE TABLE."""
fq_table, _ = mst_table
new_table_name = _unique_table_name_raw("freshness_new_tbl")
fq_new_table = f"{mst_catalog}.{mst_schema}.{new_table_name}"
try:
with sql.connect(**mst_conn_params) as conn:
conn.autocommit = False
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'test')")
cursor.tables(
catalog_name=mst_catalog,
schema_name=mst_schema,
table_name=new_table_name,
)
assert len(cursor.fetchall()) == 0
# External connection creates the table
with sql.connect(**mst_conn_params) as ext_conn:
with ext_conn.cursor() as ext_cursor:
ext_cursor.execute(
f"CREATE TABLE {fq_new_table} (id INT) USING DELTA "
f"TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported')"
)
# Re-read in same txn — should see the new table
with conn.cursor() as cursor:
cursor.tables(
catalog_name=mst_catalog,
schema_name=mst_schema,
table_name=new_table_name,
)
assert len(cursor.fetchall()) > 0, (
"Thrift cursor.tables() should see concurrent CREATE TABLE "
"(non-transactional behavior)"
)
conn.rollback()
finally:
try:
with sql.connect(**mst_conn_params) as conn:
with conn.cursor() as cursor:
cursor.execute(f"DROP TABLE IF EXISTS {fq_new_table}")
except Exception as e:
logger.warning(f"Failed to drop {fq_new_table}: {e}")
# ==================== D. BLOCKED SQL (MSTCheckRule) ====================
class TestMstBlockedSql:
"""SQL introspection statements inside active transactions.
The server restricts MST to an allowlist enforced by MSTCheckRule. The
TRANSACTION_NOT_SUPPORTED.COMMAND error originally advertised only:
"Only SELECT / INSERT / MERGE / UPDATE / DELETE / DESCRIBE TABLE are supported."
The server has since broadened the allowlist to include SHOW COLUMNS
(ShowDeltaTableColumnsCommand), observed on current DBSQL warehouses.
Blocked (throw + abort txn):
- SHOW TABLES, SHOW SCHEMAS, SHOW CATALOGS, SHOW FUNCTIONS
- DESCRIBE QUERY, DESCRIBE TABLE EXTENDED
- SELECT FROM information_schema
Allowed:
- DESCRIBE TABLE (basic form)
- SHOW COLUMNS
"""
def _assert_blocked_and_txn_aborted(self, mst_conn_params, fq_table, blocked_sql):
with sql.connect(**mst_conn_params) as conn:
conn.autocommit = False
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'before_blocked')")
with pytest.raises(Exception):
cursor.execute(blocked_sql)
with pytest.raises(Exception):
cursor.execute(
f"INSERT INTO {fq_table} VALUES (2, 'after_blocked')"
)
try:
conn.rollback()
except Exception:
pass
def _assert_not_blocked(self, mst_conn_params, fq_table, allowed_sql):
"""Assert the SQL succeeds and returns rows inside an active txn."""
with sql.connect(**mst_conn_params) as conn:
conn.autocommit = False
with conn.cursor() as cursor:
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'before')")
cursor.execute(allowed_sql)
rows = cursor.fetchall()
assert len(rows) > 0
conn.rollback()
def test_show_tables_blocked(self, mst_conn_params, mst_table, mst_catalog, mst_schema):
fq_table, _ = mst_table
self._assert_blocked_and_txn_aborted(
mst_conn_params, fq_table, f"SHOW TABLES IN {mst_catalog}.{mst_schema}"
)
def test_show_schemas_blocked(self, mst_conn_params, mst_table, mst_catalog):
fq_table, _ = mst_table
self._assert_blocked_and_txn_aborted(
mst_conn_params, fq_table, f"SHOW SCHEMAS IN {mst_catalog}"
)
def test_show_catalogs_blocked(self, mst_conn_params, mst_table):
fq_table, _ = mst_table
self._assert_blocked_and_txn_aborted(
mst_conn_params, fq_table, "SHOW CATALOGS"
)
def test_show_functions_blocked(self, mst_conn_params, mst_table):
fq_table, _ = mst_table
self._assert_blocked_and_txn_aborted(
mst_conn_params, fq_table, "SHOW FUNCTIONS"
)
def test_describe_table_extended_blocked(self, mst_conn_params, mst_table):
fq_table, _ = mst_table
self._assert_blocked_and_txn_aborted(
mst_conn_params, fq_table, f"DESCRIBE TABLE EXTENDED {fq_table}"
)
def test_information_schema_blocked(self, mst_conn_params, mst_table, mst_catalog):
fq_table, _ = mst_table
self._assert_blocked_and_txn_aborted(
mst_conn_params,
fq_table,
f"SELECT * FROM {mst_catalog}.information_schema.columns LIMIT 1",
)
def test_show_columns_not_blocked(self, mst_conn_params, mst_table):
"""SHOW COLUMNS succeeds in MST — allowed by the server's MSTCheckRule allowlist."""
fq_table, _ = mst_table
self._assert_not_blocked(
mst_conn_params, fq_table, f"SHOW COLUMNS IN {fq_table}"
)
def test_describe_query_blocked(self, mst_conn_params, mst_table):
"""DESCRIBE QUERY is blocked in MST (DescribeQueryCommand)."""
fq_table, _ = mst_table
self._assert_blocked_and_txn_aborted(
mst_conn_params,
fq_table,
f"DESCRIBE QUERY SELECT * FROM {fq_table}",
)
# DESCRIBE TABLE is explicitly listed as an allowed command in the server's
# TRANSACTION_NOT_SUPPORTED.COMMAND error message:
# "Only SELECT / INSERT / MERGE / UPDATE / DELETE / DESCRIBE TABLE are supported."
def test_describe_table_not_blocked(self, mst_conn_params, mst_table):
"""DESCRIBE TABLE succeeds in MST — explicitly allowed by the server."""
fq_table, _ = mst_table
self._assert_not_blocked(
mst_conn_params, fq_table, f"DESCRIBE TABLE {fq_table}"
)
# ==================== E. EXECUTE VARIANTS ====================
class TestMstExecuteVariants:
"""Execute method variants (executemany) inside MST."""
def test_executemany_in_txn(self, mst_conn_params, mst_table):
fq_table, _ = mst_table
with sql.connect(**mst_conn_params) as conn:
conn.autocommit = False
with conn.cursor() as cursor:
cursor.executemany(
f"INSERT INTO {fq_table} VALUES (%(id)s, %(value)s)",
[
{"id": 1, "value": "a"},
{"id": 2, "value": "b"},
{"id": 3, "value": "c"},
],
)
conn.commit()
assert _get_row_count(mst_conn_params, fq_table) == 3
def test_executemany_rollback_in_txn(self, mst_conn_params, mst_table):
fq_table, _ = mst_table
with sql.connect(**mst_conn_params) as conn:
conn.autocommit = False
with conn.cursor() as cursor:
cursor.executemany(
f"INSERT INTO {fq_table} VALUES (%(id)s, %(value)s)",
[{"id": 1, "value": "a"}, {"id": 2, "value": "b"}],
)
conn.rollback()
assert _get_row_count(mst_conn_params, fq_table) == 0