Skip to content

Commit 4c0ab66

Browse files
committed
Fetch ClickHouse MergeTree limits dynamically and update backpressure configuration. Adjust schema defaults for compatibility.
1 parent a7b28fc commit 4c0ab66

4 files changed

Lines changed: 65 additions & 4 deletions

File tree

src/config/schema.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,10 @@ export const configSchema = z.object({
8484
enabled: booleanFromEnv.default(true),
8585
partsToThrowInsert: intFromEnv.default(300),
8686
maxPartsInTotal: intFromEnv.default(500),
87-
partitionPctHigh: numberFromEnv.default(0.50).pipe(z.number().min(0).max(1)),
88-
partitionPctLow: numberFromEnv.default(0.35).pipe(z.number().min(0).max(1)),
89-
totalPctHigh: numberFromEnv.default(0.50).pipe(z.number().min(0).max(1)),
90-
totalPctLow: numberFromEnv.default(0.40).pipe(z.number().min(0).max(1)),
87+
partitionPctHigh: numberFromEnv.default(0.70).pipe(z.number().min(0).max(1)),
88+
partitionPctLow: numberFromEnv.default(0.55).pipe(z.number().min(0).max(1)),
89+
totalPctHigh: numberFromEnv.default(0.70).pipe(z.number().min(0).max(1)),
90+
totalPctLow: numberFromEnv.default(0.55).pipe(z.number().min(0).max(1)),
9191
pollIntervalMs: intFromEnv.default(15_000),
9292
maxPauseEpisodeMs: intFromEnv.default(180_000),
9393
}),

src/main.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,11 @@ async function main(): Promise<void> {
8787
request_timeout: config.target.queryTimeoutMs,
8888
});
8989

90+
// Fetch actual ClickHouse MergeTree settings and override config defaults
91+
const serverLimits = await ClickHousePressure.fetchServerLimits(pressureClient, logger);
92+
config.backpressure.partsToThrowInsert = serverLimits.partsToThrowInsert;
93+
config.backpressure.maxPartsInTotal = serverLimits.maxPartsInTotal;
94+
9095
const chPressure = new ClickHousePressure(pressureClient, config.backpressure, logger);
9196

9297
// Map config.memory to GcConfig shape

src/source/discover-collections.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,13 @@ export async function discoverCollections(
2020

2121
const names = collections.map((c) => c.name).sort();
2222

23+
// Ensure the base collection (exact prefix match) is always processed first
24+
const baseIdx = names.indexOf(prefix);
25+
if (baseIdx > 0) {
26+
names.splice(baseIdx, 1);
27+
names.unshift(prefix);
28+
}
29+
2330
if (names.length === 0) {
2431
throw new Error(
2532
`No collections found matching prefix "${prefix}" in database "${db.databaseName}"`,

src/target/clickhouse-pressure.ts

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,14 @@ export interface PressureState {
3030
pauseReason: string | null;
3131
}
3232

33+
export interface ServerMergeTreeLimits {
34+
partsToThrowInsert: number;
35+
partsToDelayInsert: number;
36+
maxPartsInTotal: number;
37+
inactivePartsToThrowInsert: number;
38+
inactivePartsToDelayInsert: number;
39+
}
40+
3341
export class ClickHousePressure {
3442
private readonly client: ClickHouseClient;
3543
private readonly config: BackpressureConfig;
@@ -41,6 +49,47 @@ export class ClickHousePressure {
4149
this.logger = logger.child({ component: 'clickhouse-pressure' });
4250
}
4351

52+
/**
53+
* Query the actual MergeTree settings from the ClickHouse server.
54+
* Returns the server-side limits that govern when inserts are rejected or delayed.
55+
*/
56+
static async fetchServerLimits(
57+
client: ClickHouseClient,
58+
logger: Logger,
59+
): Promise<ServerMergeTreeLimits> {
60+
const result = await client.query({
61+
query: `
62+
SELECT name, value
63+
FROM system.merge_tree_settings
64+
WHERE name IN (
65+
'parts_to_throw_insert',
66+
'parts_to_delay_insert',
67+
'max_parts_in_total',
68+
'inactive_parts_to_throw_insert',
69+
'inactive_parts_to_delay_insert'
70+
)
71+
`,
72+
format: 'JSONEachRow',
73+
});
74+
75+
const rows = await result.json<{ name: string; value: string }>();
76+
const map: Record<string, number> = {};
77+
for (const row of rows) {
78+
map[row.name] = Number(row.value);
79+
}
80+
81+
const limits: ServerMergeTreeLimits = {
82+
partsToThrowInsert: map['parts_to_throw_insert'] ?? 300,
83+
partsToDelayInsert: map['parts_to_delay_insert'] ?? 150,
84+
maxPartsInTotal: map['max_parts_in_total'] ?? 100000,
85+
inactivePartsToThrowInsert: map['inactive_parts_to_throw_insert'] ?? 0,
86+
inactivePartsToDelayInsert: map['inactive_parts_to_delay_insert'] ?? 0,
87+
};
88+
89+
logger.info({ limits }, 'Fetched ClickHouse merge_tree_settings');
90+
return limits;
91+
}
92+
4493
async sample(database: string, table: string): Promise<PressureState> {
4594
const results = await Promise.allSettled([
4695
this.queryActivePartsTotal(database, table),

0 commit comments

Comments
 (0)