Skip to content

Commit c667602

Browse files
authored
Statement level query tag in NodeJS connector (#366)
* add statement level query tags support Signed-off-by: Jiabin Hu <jiabin.hu@databricks.com> * Escape backslashes in query tag keys Co-authored-by: Isaac Signed-off-by: Jiabin Hu <jiabin.hu@databricks.com> * Support setting session level tags as dict Signed-off-by: Jiabin Hu <jiabin.hu@databricks.com> * add sample code for query tags with both session and statement level Signed-off-by: Jiabin Hu <jiabin.hu@databricks.com> * Fix lint errors - Use dot notation for QUERY_TAGS configuration access - Convert serializeQueryTags to default export Co-authored-by: Isaac Signed-off-by: Jiabin Hu <jiabin.hu@databricks.com> --------- Signed-off-by: Jiabin Hu <jiabin.hu@databricks.com>
1 parent de440ef commit c667602

11 files changed

Lines changed: 287 additions & 4 deletions

File tree

examples/query_tags.js

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
const { DBSQLClient } = require('..');
2+
3+
const client = new DBSQLClient();
4+
5+
const host = process.env.DATABRICKS_HOST;
6+
const path = process.env.DATABRICKS_HTTP_PATH;
7+
const token = process.env.DATABRICKS_TOKEN;
8+
9+
client
10+
.connect({ host, path, token })
11+
.then(async (client) => {
12+
// Session-level query tags: applied to every statement run on this session
13+
// (serialized into the session's QUERY_TAGS configuration).
14+
const session = await client.openSession({
15+
queryTags: {
16+
team: 'engineering',
17+
env: 'dev',
18+
driver: 'node',
19+
},
20+
});
21+
22+
// Statement A: inherits session-level tags only.
23+
const opA = await session.executeStatement('SELECT 1 AS inherits_session_tags');
24+
console.log(await opA.fetchAll());
25+
await opA.close();
26+
27+
// Statement B: statement-level query tags via executeStatement options.
28+
// These are passed via confOverlay as "query_tags" and apply ONLY to this statement.
29+
// Note: `env` here overrides the session-level `env: 'dev'` — for this statement
30+
// it will be `env: 'prod'`. Subsequent statements without statement-level tags
31+
// revert to the session-level values.
32+
const opB = await session.executeStatement('SELECT 2 AS has_statement_tags', {
33+
queryTags: {
34+
env: 'prod',
35+
request_id: 'abc-123',
36+
feature: 'reporting',
37+
},
38+
});
39+
console.log(await opB.fetchAll());
40+
await opB.close();
41+
42+
// Statement C: demonstrates escaping of special characters (`\`, `:`, `,`)
43+
// in tag values, plus null/undefined values which serialize as bare keys.
44+
const opC = await session.executeStatement('SELECT 3 AS escaped_and_null_tags', {
45+
queryTags: {
46+
path: 'C:\\users\\me',
47+
note: 'hello, world',
48+
flag: null,
49+
},
50+
});
51+
console.log(await opC.fetchAll());
52+
await opC.close();
53+
54+
await session.close();
55+
await client.close();
56+
})
57+
.catch((error) => {
58+
console.log(error);
59+
});

examples/session_params.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,12 @@ client
1010
.connect({ host, path, token })
1111
.then(async (client) => {
1212
const session = await client.openSession({
13+
queryTags: {
14+
team: 'engineering',
15+
test: 'session-params',
16+
driver: 'node',
17+
},
1318
configuration: {
14-
QUERY_TAGS: 'team:engineering,test:session-params,driver:node',
1519
ansi_mode: 'false',
1620
},
1721
});

lib/DBSQLClient.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import HttpConnection from './connection/connections/HttpConnection';
1616
import IConnectionOptions from './connection/contracts/IConnectionOptions';
1717
import Status from './dto/Status';
1818
import HiveDriverError from './errors/HiveDriverError';
19-
import { buildUserAgentString, definedOrError } from './utils';
19+
import { buildUserAgentString, definedOrError, serializeQueryTags } from './utils';
2020
import PlainHttpAuthentication from './connection/auth/PlainHttpAuthentication';
2121
import DatabricksOAuth, { OAuthFlow } from './connection/auth/DatabricksOAuth';
2222
import {
@@ -298,6 +298,16 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
298298
configuration['spark.sql.thriftserver.metadata.metricview.enabled'] = 'true';
299299
}
300300

301+
// Serialize queryTags dict and set in configuration; takes precedence over configuration.QUERY_TAGS
302+
if (request.queryTags !== undefined) {
303+
const serialized = serializeQueryTags(request.queryTags);
304+
if (serialized) {
305+
configuration.QUERY_TAGS = serialized;
306+
} else {
307+
delete configuration.QUERY_TAGS;
308+
}
309+
}
310+
301311
const response = await this.driver.openSession({
302312
client_protocol_i64: new Int64(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V8),
303313
...getInitialNamespaceOptions(request.initialCatalog, request.initialSchema),

lib/DBSQLSession.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import IOperation from './contracts/IOperation';
3131
import DBSQLOperation from './DBSQLOperation';
3232
import Status from './dto/Status';
3333
import InfoValue from './dto/InfoValue';
34-
import { definedOrError, LZ4, ProtocolVersion } from './utils';
34+
import { definedOrError, LZ4, ProtocolVersion, serializeQueryTags } from './utils';
3535
import CloseableCollection from './utils/CloseableCollection';
3636
import { LogLevel } from './contracts/IDBSQLLogger';
3737
import HiveDriverError from './errors/HiveDriverError';
@@ -227,6 +227,11 @@ export default class DBSQLSession implements IDBSQLSession {
227227
request.parameters = getQueryParameters(options.namedParameters, options.ordinalParameters);
228228
}
229229

230+
const serializedQueryTags = serializeQueryTags(options.queryTags);
231+
if (serializedQueryTags !== undefined) {
232+
request.confOverlay = { ...request.confOverlay, query_tags: serializedQueryTags };
233+
}
234+
230235
if (ProtocolVersion.supportsCloudFetch(this.serverProtocolVersion)) {
231236
request.canDownloadResult = options.useCloudFetch ?? clientConfig.useCloudFetch;
232237
}

lib/contracts/IDBSQLClient.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ export interface OpenSessionRequest {
6060
initialCatalog?: string;
6161
initialSchema?: string;
6262
configuration?: { [key: string]: string };
63+
/**
64+
* Session-level query tags as key-value pairs. Serialized and passed via session configuration
65+
* as "QUERY_TAGS". Values may be null/undefined to include a key without a value.
66+
* If both queryTags and configuration.QUERY_TAGS are specified, queryTags takes precedence.
67+
*/
68+
queryTags?: Record<string, string | null | undefined>;
6369
}
6470

6571
export default interface IDBSQLClient {

lib/contracts/IDBSQLSession.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ export type ExecuteStatementOptions = {
2121
stagingAllowedLocalPath?: string | string[];
2222
namedParameters?: Record<string, DBSQLParameter | DBSQLParameterValue>;
2323
ordinalParameters?: Array<DBSQLParameter | DBSQLParameterValue>;
24+
/**
25+
* Per-statement query tags as key-value pairs. Serialized and passed via confOverlay
26+
* as "query_tags". Values may be null/undefined to include a key without a value.
27+
* These tags apply only to this statement and do not persist across queries.
28+
*/
29+
queryTags?: Record<string, string | null | undefined>;
2430
};
2531

2632
export type TypeInfoRequest = {

lib/utils/index.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,14 @@ import buildUserAgentString from './buildUserAgentString';
33
import formatProgress, { ProgressUpdateTransformer } from './formatProgress';
44
import LZ4 from './lz4';
55
import * as ProtocolVersion from './protocolVersion';
6+
import serializeQueryTags from './queryTags';
67

7-
export { definedOrError, buildUserAgentString, formatProgress, ProgressUpdateTransformer, LZ4, ProtocolVersion };
8+
export {
9+
definedOrError,
10+
buildUserAgentString,
11+
formatProgress,
12+
ProgressUpdateTransformer,
13+
LZ4,
14+
ProtocolVersion,
15+
serializeQueryTags,
16+
};

lib/utils/queryTags.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/**
2+
* Serializes a query tags dictionary into a string for use in confOverlay.
3+
*
4+
* Format: comma-separated key:value pairs, e.g. "key1:value1,key2:value2"
5+
* - If a value is null or undefined, the key is included without a colon or value
6+
* - Backslashes in keys are escaped; other special characters in keys are not escaped
7+
* - Special characters (backslash, colon, comma) in values are backslash-escaped
8+
*
9+
* @param queryTags - dictionary of query tag key-value pairs
10+
* @returns serialized string, or undefined if input is empty/null/undefined
11+
*/
12+
export default function serializeQueryTags(
13+
queryTags: Record<string, string | null | undefined> | null | undefined,
14+
): string | undefined {
15+
if (queryTags == null) {
16+
return undefined;
17+
}
18+
19+
const keys = Object.keys(queryTags);
20+
if (keys.length === 0) {
21+
return undefined;
22+
}
23+
24+
return keys
25+
.map((key) => {
26+
const escapedKey = key.replace(/\\/g, '\\\\');
27+
const value = queryTags[key];
28+
if (value == null) {
29+
return escapedKey;
30+
}
31+
const escapedValue = value.replace(/[\\:,]/g, (c) => `\\${c}`);
32+
return `${escapedKey}:${escapedValue}`;
33+
})
34+
.join(',');
35+
}

tests/unit/DBSQLClient.test.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,4 +636,49 @@ describe('DBSQLClient.enableMetricViewMetadata', () => {
636636
'spark.sql.thriftserver.metadata.metricview.enabled': 'true',
637637
});
638638
});
639+
640+
it('should serialize queryTags dict and set in session configuration', async () => {
641+
const client = new DBSQLClient();
642+
const thriftClient = new ThriftClientStub();
643+
sinon.stub(client, 'getClient').returns(Promise.resolve(thriftClient));
644+
645+
await client.openSession({
646+
queryTags: { team: 'data-eng', project: 'etl' },
647+
});
648+
649+
expect(thriftClient.openSessionReq?.configuration).to.deep.equal({
650+
QUERY_TAGS: 'team:data-eng,project:etl',
651+
});
652+
});
653+
654+
it('should let queryTags take precedence over configuration.QUERY_TAGS', async () => {
655+
const client = new DBSQLClient();
656+
const thriftClient = new ThriftClientStub();
657+
sinon.stub(client, 'getClient').returns(Promise.resolve(thriftClient));
658+
659+
await client.openSession({
660+
queryTags: { team: 'new-team' },
661+
configuration: { QUERY_TAGS: 'team:old-team,other:value', ansi_mode: 'true' },
662+
});
663+
664+
expect(thriftClient.openSessionReq?.configuration).to.deep.equal({
665+
QUERY_TAGS: 'team:new-team',
666+
ansi_mode: 'true',
667+
});
668+
});
669+
670+
it('should remove QUERY_TAGS from configuration when queryTags is empty', async () => {
671+
const client = new DBSQLClient();
672+
const thriftClient = new ThriftClientStub();
673+
sinon.stub(client, 'getClient').returns(Promise.resolve(thriftClient));
674+
675+
await client.openSession({
676+
queryTags: {},
677+
configuration: { QUERY_TAGS: 'team:old-team', ansi_mode: 'true' },
678+
});
679+
680+
expect(thriftClient.openSessionReq?.configuration).to.deep.equal({
681+
ansi_mode: 'true',
682+
});
683+
});
639684
});

tests/unit/DBSQLSession.test.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,44 @@ describe('DBSQLSession', () => {
259259
});
260260
});
261261

262+
describe('executeStatement with queryTags', () => {
263+
it('should set confOverlay with query_tags when queryTags are provided', async () => {
264+
const context = new ClientContextStub();
265+
const driver = sinon.spy(context.driver);
266+
const session = new DBSQLSession({ handle: sessionHandleStub, context });
267+
268+
await session.executeStatement('SELECT 1', { queryTags: { team: 'eng', app: 'etl' } });
269+
270+
expect(driver.executeStatement.callCount).to.eq(1);
271+
const req = driver.executeStatement.firstCall.args[0];
272+
expect(req.confOverlay).to.deep.include({ query_tags: 'team:eng,app:etl' });
273+
});
274+
275+
it('should not set confOverlay query_tags when queryTags is not provided', async () => {
276+
const context = new ClientContextStub();
277+
const driver = sinon.spy(context.driver);
278+
const session = new DBSQLSession({ handle: sessionHandleStub, context });
279+
280+
await session.executeStatement('SELECT 1');
281+
282+
expect(driver.executeStatement.callCount).to.eq(1);
283+
const req = driver.executeStatement.firstCall.args[0];
284+
expect(req.confOverlay?.query_tags).to.be.undefined;
285+
});
286+
287+
it('should not set confOverlay query_tags when queryTags is empty', async () => {
288+
const context = new ClientContextStub();
289+
const driver = sinon.spy(context.driver);
290+
const session = new DBSQLSession({ handle: sessionHandleStub, context });
291+
292+
await session.executeStatement('SELECT 1', { queryTags: {} });
293+
294+
expect(driver.executeStatement.callCount).to.eq(1);
295+
const req = driver.executeStatement.firstCall.args[0];
296+
expect(req.confOverlay?.query_tags).to.be.undefined;
297+
});
298+
});
299+
262300
describe('getTypeInfo', () => {
263301
it('should run operation', async () => {
264302
const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() });

0 commit comments

Comments
 (0)