Skip to content

Commit c321bbf

Browse files
authored
Merge branch 'main' into main
2 parents 9aba22b + de440ef commit c321bbf

25 files changed

Lines changed: 5219 additions & 7 deletions

.prettierignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ node_modules
55
.nyc_output
66
coverage_e2e
77
coverage_unit
8+
coverage
89
.clinic
910

1011
dist

lib/DBSQLClient.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,12 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
238238
this.config.enableMetricViewMetadata = options.enableMetricViewMetadata;
239239
}
240240

241+
// Persist userAgentEntry so telemetry and feature-flag call sites reuse
242+
// the same value as the primary Thrift connection's User-Agent.
243+
if (options.userAgentEntry !== undefined) {
244+
this.config.userAgentEntry = options.userAgentEntry;
245+
}
246+
241247
this.authProvider = this.createAuthProvider(options, authProvider);
242248

243249
this.connectionProvider = this.createConnectionProvider(options);
@@ -362,4 +368,15 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
362368
public async getDriver(): Promise<IDriver> {
363369
return this.driver;
364370
}
371+
372+
/**
373+
* Returns the authentication provider associated with this client, if any.
374+
* Intended for internal telemetry/feature-flag call sites that need to
375+
* obtain auth headers directly without routing through `IClientContext`.
376+
*
377+
* @internal Not part of the public API. May change without notice.
378+
*/
379+
public getAuthProvider(): IAuthentication | undefined {
380+
return this.authProvider;
381+
}
365382
}

lib/contracts/IClientContext.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,16 @@ export interface ClientConfig {
2828
telemetryBatchSize?: number;
2929
telemetryFlushIntervalMs?: number;
3030
telemetryMaxRetries?: number;
31+
telemetryBackoffBaseMs?: number;
32+
telemetryBackoffMaxMs?: number;
33+
telemetryBackoffJitterMs?: number;
3134
telemetryAuthenticatedExport?: boolean;
3235
telemetryCircuitBreakerThreshold?: number;
3336
telemetryCircuitBreakerTimeout?: number;
37+
telemetryMaxPendingMetrics?: number;
38+
telemetryMaxErrorsPerStatement?: number;
39+
telemetryStatementTtlMs?: number;
40+
userAgentEntry?: string;
3441
}
3542

3643
export default interface IClientContext {

lib/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ import { LogLevel } from './contracts/IDBSQLLogger';
2323
// Re-export types for TypeScript users
2424
export type { default as ITokenProvider } from './connection/auth/tokenProvider/ITokenProvider';
2525

26+
// Re-export telemetry error classes so consumers can instanceof-check rather
27+
// than string-matching error messages.
28+
export { CircuitBreakerOpenError, CIRCUIT_BREAKER_OPEN_CODE } from './telemetry/CircuitBreaker';
29+
export { TelemetryTerminalError } from './telemetry/DatabricksTelemetryExporter';
30+
2631
export const auth = {
2732
PlainHttpAuthentication,
2833
// Token provider classes for custom authentication

lib/telemetry/CircuitBreaker.ts

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
/**
2+
* Copyright (c) 2025 Databricks Contributors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import IClientContext from '../contracts/IClientContext';
18+
import { LogLevel } from '../contracts/IDBSQLLogger';
19+
20+
export enum CircuitBreakerState {
21+
CLOSED = 'CLOSED',
22+
OPEN = 'OPEN',
23+
HALF_OPEN = 'HALF_OPEN',
24+
}
25+
26+
export interface CircuitBreakerConfig {
27+
failureThreshold: number;
28+
timeout: number;
29+
successThreshold: number;
30+
}
31+
32+
export const DEFAULT_CIRCUIT_BREAKER_CONFIG: Readonly<CircuitBreakerConfig> = Object.freeze({
33+
failureThreshold: 5,
34+
timeout: 60000,
35+
successThreshold: 2,
36+
});
37+
38+
export const CIRCUIT_BREAKER_OPEN_CODE = 'CIRCUIT_BREAKER_OPEN' as const;
39+
40+
/**
41+
* Thrown when execute() is called while the breaker is OPEN or a HALF_OPEN
42+
* probe is already in flight. Callers identify the condition via
43+
* `instanceof CircuitBreakerOpenError` or `err.code === CIRCUIT_BREAKER_OPEN_CODE`
44+
* rather than string-matching the message.
45+
*/
46+
export class CircuitBreakerOpenError extends Error {
47+
readonly code = CIRCUIT_BREAKER_OPEN_CODE;
48+
49+
constructor(message = 'Circuit breaker OPEN') {
50+
super(message);
51+
this.name = 'CircuitBreakerOpenError';
52+
}
53+
}
54+
55+
export class CircuitBreaker {
56+
private state: CircuitBreakerState = CircuitBreakerState.CLOSED;
57+
58+
private failureCount = 0;
59+
60+
private successCount = 0;
61+
62+
private nextAttempt?: Date;
63+
64+
private halfOpenInflight = 0;
65+
66+
private readonly config: CircuitBreakerConfig;
67+
68+
constructor(private context: IClientContext, config?: Partial<CircuitBreakerConfig>) {
69+
this.config = {
70+
...DEFAULT_CIRCUIT_BREAKER_CONFIG,
71+
...config,
72+
};
73+
}
74+
75+
async execute<T>(operation: () => Promise<T>): Promise<T> {
76+
const admitted = this.tryAdmit();
77+
if (!admitted) {
78+
throw new CircuitBreakerOpenError();
79+
}
80+
81+
const { wasHalfOpenProbe } = admitted;
82+
83+
try {
84+
const result = await operation();
85+
this.onSuccess();
86+
return result;
87+
} catch (error) {
88+
this.onFailure();
89+
throw error;
90+
} finally {
91+
if (wasHalfOpenProbe && this.halfOpenInflight > 0) {
92+
this.halfOpenInflight -= 1;
93+
}
94+
}
95+
}
96+
97+
/**
98+
* Synchronous admission check. Returning `null` means "reject". Returning
99+
* an object means the caller is admitted; `wasHalfOpenProbe` indicates
100+
* whether this admission consumed the single HALF_OPEN probe slot so the
101+
* caller can decrement it in `finally`.
102+
*
103+
* Running this as a single synchronous block is what prevents the
104+
* concurrent-probe race that existed in the previous implementation.
105+
*/
106+
private tryAdmit(): { wasHalfOpenProbe: boolean } | null {
107+
const logger = this.context.getLogger();
108+
109+
if (this.state === CircuitBreakerState.OPEN) {
110+
if (this.nextAttempt && Date.now() < this.nextAttempt.getTime()) {
111+
return null;
112+
}
113+
this.state = CircuitBreakerState.HALF_OPEN;
114+
this.successCount = 0;
115+
this.halfOpenInflight = 0;
116+
logger.log(LogLevel.debug, 'Circuit breaker transitioned to HALF_OPEN');
117+
}
118+
119+
if (this.state === CircuitBreakerState.HALF_OPEN) {
120+
if (this.halfOpenInflight > 0) {
121+
return null;
122+
}
123+
this.halfOpenInflight += 1;
124+
return { wasHalfOpenProbe: true };
125+
}
126+
127+
return { wasHalfOpenProbe: false };
128+
}
129+
130+
getState(): CircuitBreakerState {
131+
return this.state;
132+
}
133+
134+
getFailureCount(): number {
135+
return this.failureCount;
136+
}
137+
138+
getSuccessCount(): number {
139+
return this.successCount;
140+
}
141+
142+
private onSuccess(): void {
143+
const logger = this.context.getLogger();
144+
145+
this.failureCount = 0;
146+
147+
if (this.state === CircuitBreakerState.HALF_OPEN) {
148+
this.successCount += 1;
149+
logger.log(
150+
LogLevel.debug,
151+
`Circuit breaker success in HALF_OPEN (${this.successCount}/${this.config.successThreshold})`,
152+
);
153+
154+
if (this.successCount >= this.config.successThreshold) {
155+
this.state = CircuitBreakerState.CLOSED;
156+
this.successCount = 0;
157+
this.nextAttempt = undefined;
158+
logger.log(LogLevel.debug, 'Circuit breaker transitioned to CLOSED');
159+
}
160+
}
161+
}
162+
163+
private onFailure(): void {
164+
const logger = this.context.getLogger();
165+
166+
this.failureCount += 1;
167+
this.successCount = 0;
168+
169+
logger.log(LogLevel.debug, `Circuit breaker failure (${this.failureCount}/${this.config.failureThreshold})`);
170+
171+
if (this.state === CircuitBreakerState.HALF_OPEN || this.failureCount >= this.config.failureThreshold) {
172+
this.state = CircuitBreakerState.OPEN;
173+
this.nextAttempt = new Date(Date.now() + this.config.timeout);
174+
logger.log(
175+
LogLevel.warn,
176+
`Telemetry circuit breaker OPEN after ${this.failureCount} failures (will retry after ${this.config.timeout}ms)`,
177+
);
178+
}
179+
}
180+
}
181+
182+
export class CircuitBreakerRegistry {
183+
private breakers: Map<string, CircuitBreaker>;
184+
185+
constructor(private context: IClientContext) {
186+
this.breakers = new Map();
187+
}
188+
189+
getCircuitBreaker(host: string, config?: Partial<CircuitBreakerConfig>): CircuitBreaker {
190+
let breaker = this.breakers.get(host);
191+
if (!breaker) {
192+
breaker = new CircuitBreaker(this.context, config);
193+
this.breakers.set(host, breaker);
194+
const logger = this.context.getLogger();
195+
logger.log(LogLevel.debug, `Created circuit breaker for host: ${host}`);
196+
} else if (config) {
197+
const logger = this.context.getLogger();
198+
logger.log(LogLevel.debug, `Circuit breaker for host ${host} already exists; provided config will be ignored`);
199+
}
200+
return breaker;
201+
}
202+
203+
getAllBreakers(): Map<string, CircuitBreaker> {
204+
return new Map(this.breakers);
205+
}
206+
207+
removeCircuitBreaker(host: string): void {
208+
this.breakers.delete(host);
209+
const logger = this.context.getLogger();
210+
logger.log(LogLevel.debug, `Removed circuit breaker for host: ${host}`);
211+
}
212+
213+
clear(): void {
214+
this.breakers.clear();
215+
}
216+
}

0 commit comments

Comments
 (0)