Skip to content

Commit ee8864f

Browse files
committed
Add HashResolver to handle collection defaults via SHA1 hash resolution and enhance migration progress tracking
- Introduced `HashResolver` to map drill_events{sha1} collections to (appId, eventName) pairs. - Updated schema and config to include `countlyDb` for hash resolution. - Integrated `HashResolver` into Orchestrator for resolving collection-level defaults. - Enhanced progress tracking with estimated document counts and collection-level stats in HTTP stats route. - Refined document transformation to leverage collection hash defaults for `a` and `e` fields.
1 parent aba78d5 commit ee8864f

8 files changed

Lines changed: 357 additions & 9 deletions

File tree

src/config/loader.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ function envToRawConfig(env: NodeJS.ProcessEnv) {
2222
source: {
2323
uri: env.MONGO_URI,
2424
db: env.MONGO_DB,
25+
countlyDb: env.MONGO_COUNTLY_DB,
2526
collectionPrefix: env.MONGO_COLLECTION_PREFIX,
2627
readPreference: env.MONGO_READ_PREFERENCE,
2728
readConcern: env.MONGO_READ_CONCERN,

src/config/schema.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ export const configSchema = z.object({
4949
source: z.object({
5050
uri: z.string().min(1),
5151
db: z.string().default("countly_drill"),
52+
countlyDb: z.string().default("countly"),
5253
collectionPrefix: z.string().default("drill_events"),
5354
readPreference: z.string().default("primary"),
5455
readConcern: z.string().default("majority"),

src/http/stats-route.ts

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,35 @@ import type { CommandFlags } from '../state/redis-hot-state.ts';
88
import type { Config } from '../config/schema.ts';
99
import type { CollectionOrchestrator, OrchestratorProgress } from '../runtime/collection-orchestrator.ts';
1010

11+
// ─────────────────────────────────────────────────────────────────────────────
12+
// Formatting helpers
13+
// ─────────────────────────────────────────────────────────────────────────────
14+
15+
function progressBar(pct: number, width = 30): string {
16+
const filled = Math.round((pct / 100) * width);
17+
const arrow = filled < width ? '>' : '';
18+
const empty = Math.max(0, width - filled - (arrow ? 1 : 0));
19+
return '[' + '='.repeat(filled) + arrow + ' '.repeat(empty) + '] ' + pct + '%';
20+
}
21+
22+
function fmtDuration(ms: number): string {
23+
const s = Math.floor(ms / 1000);
24+
const h = Math.floor(s / 3600);
25+
const m = Math.floor((s % 3600) / 60);
26+
const sec = s % 60;
27+
if (h > 0) return `${h}h ${m}m ${sec}s`;
28+
if (m > 0) return `${m}m ${sec}s`;
29+
return `${sec}s`;
30+
}
31+
32+
function fmtNum(n: number): string {
33+
return n.toLocaleString('en-US');
34+
}
35+
36+
// ─────────────────────────────────────────────────────────────────────────────
37+
// Route
38+
// ─────────────────────────────────────────────────────────────────────────────
39+
1140
export interface StatsDeps {
1241
orchestrator: CollectionOrchestrator;
1342
mongoReader: { isConnected(): boolean };
@@ -46,6 +75,7 @@ export function registerStatsRoute(app: FastifyInstance, deps: StatsDeps): void
4675
const batchStats = orchestrator.getStats();
4776
const runnerStatus = orchestrator.getStatus();
4877
const currentBatchSeq = orchestrator.getCurrentBatchSeq();
78+
const estimatedCounts = orchestrator.getEstimatedCounts();
4979

5080
// Find the current run ID from the orchestrator's progress
5181
const currentResult = progress.results.find(
@@ -78,6 +108,27 @@ export function registerStatsRoute(app: FastifyInstance, deps: StatsDeps): void
78108
const elapsedMs = batchStats?.elapsedMs ?? 0;
79109
const elapsedSec = elapsedMs / 1000;
80110

111+
// ── Progress calculations ───────────────────────────────────────────
112+
const currentCollEstimate = progress.currentCollection
113+
? estimatedCounts.get(progress.currentCollection) ?? 0 : 0;
114+
const currentCollDocsRead = totalDocsRead;
115+
const currentCollPct = currentCollEstimate > 0
116+
? Math.min(100, Math.round((currentCollDocsRead / currentCollEstimate) * 100)) : 0;
117+
118+
// Overall: completed collections use actual docsRead, fallback to estimate
119+
const totalEstimated = Array.from(estimatedCounts.values()).reduce((a, b) => a + b, 0);
120+
const completedDocsRead = progress.results
121+
.filter(r => r.status === 'completed' || r.status === 'skipped')
122+
.reduce((sum, r) => sum + (r.docsRead ?? estimatedCounts.get(r.collection) ?? 0), 0);
123+
const overallDocsRead = completedDocsRead + currentCollDocsRead;
124+
const overallPct = totalEstimated > 0
125+
? Math.min(100, Math.round((overallDocsRead / totalEstimated) * 100)) : 0;
126+
127+
// ETA based on elapsed time and progress percentage
128+
const etaMs = overallPct > 0 && overallPct < 100 && elapsedMs > 0
129+
? Math.round(((Date.now() - startedAt.getTime()) / overallPct) * (100 - overallPct))
130+
: null;
131+
81132
// Commands from Redis (best-effort)
82133
let commands: CommandFlags = {};
83134
if (runId) {
@@ -91,6 +142,37 @@ export function registerStatsRoute(app: FastifyInstance, deps: StatsDeps): void
91142
const redisMetrics = redisState.getMetrics();
92143

93144
const payload = {
145+
// ── Quick-glance summary ──────────────────────────────────────────
146+
summary: {
147+
overall: progressBar(overallPct),
148+
overallPct,
149+
currentCollection: progress.currentCollection
150+
? `${progress.currentCollection} ${progressBar(currentCollPct)}`
151+
: 'idle',
152+
currentCollectionPct: currentCollPct,
153+
collections: `${progress.completedCollections}/${progress.totalCollections} done`
154+
+ (progress.failedCollections > 0 ? `, ${progress.failedCollections} failed` : '')
155+
+ (progress.skippedCollections > 0 ? `, ${progress.skippedCollections} skipped` : ''),
156+
docsProgress: `${fmtNum(overallDocsRead)} / ~${fmtNum(totalEstimated)} docs`,
157+
throughput: `${fmtNum(Math.round(batchStats?.docsPerSecond ?? 0))} docs/s`,
158+
elapsed: fmtDuration(Date.now() - startedAt.getTime()),
159+
eta: etaMs !== null ? `~${fmtDuration(etaMs)}` : 'calculating...',
160+
status: runnerStatus,
161+
},
162+
163+
// ── Current collection detail ─────────────────────────────────────
164+
currentCollectionProgress: progress.currentCollection ? {
165+
collection: progress.currentCollection,
166+
estimated: currentCollEstimate,
167+
docsRead: currentCollDocsRead,
168+
rowsInserted: totalRowsInserted,
169+
pct: currentCollPct,
170+
bar: progressBar(currentCollPct),
171+
batchSeq: currentBatchSeq,
172+
skipRate: currentCollDocsRead > 0
173+
? `${((totalDocsSkipped / currentCollDocsRead) * 100).toFixed(1)}%` : '0%',
174+
} : null,
175+
94176
service: {
95177
name: config.service.name,
96178
version,
@@ -107,6 +189,13 @@ export function registerStatsRoute(app: FastifyInstance, deps: StatsDeps): void
107189
skippedCollections: progress.skippedCollections,
108190
currentCollection: progress.currentCollection,
109191
collections: progress.collections,
192+
collectionProgress: progress.results.map(r => ({
193+
collection: r.collection,
194+
status: r.status,
195+
estimated: estimatedCounts.get(r.collection) ?? null,
196+
docsRead: r.docsRead ?? null,
197+
rowsInserted: r.rowsInserted ?? null,
198+
})),
110199
},
111200
run: {
112201
sourceNs: runRecord?.source_ns ?? null,

src/main.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { MongoReader } from './source/mongo-reader.ts';
88
import { ClickHouseWriter } from './target/clickhouse-writer.ts';
99
import { ClickHousePressure } from './target/clickhouse-pressure.ts';
1010
import { CollectionOrchestrator } from './runtime/collection-orchestrator.ts';
11+
import { HashResolver } from './transform/hash-resolver.ts';
1112
import { RetryPolicy } from './runtime/retry-policy.ts';
1213
import { GcController, type GcConfig } from './runtime/gc-controller.ts';
1314
import { ProcessMetricsCollector } from './runtime/process-metrics.ts';
@@ -119,6 +120,14 @@ async function main(): Promise<void> {
119120
}
120121
logger.info('All external services connected');
121122

123+
// ── 4b. Build hash resolver for drill_events* collection defaults ────
124+
const hashResolver = new HashResolver(
125+
{ uri: config.source.uri, countlyDb: config.source.countlyDb },
126+
logger,
127+
);
128+
await hashResolver.build();
129+
logger.info({ hashEntries: hashResolver.size }, 'Hash resolver built');
130+
122131
// ── 5. Create CollectionOrchestrator ────────────────────────────────
123132
const orchestrator = new CollectionOrchestrator({
124133
manifestStore,
@@ -128,6 +137,7 @@ async function main(): Promise<void> {
128137
chPressure,
129138
gcController,
130139
retryPolicy,
140+
hashResolver,
131141
logger,
132142
config,
133143
});
@@ -227,6 +237,7 @@ async function main(): Promise<void> {
227237
await closeResource('ClickHouse', async () => { await chWriter.close(); await pressureClient.close(); });
228238
await closeResource('Redis', () => redisState.close());
229239
await closeResource('ManifestStore', () => manifestStore.close());
240+
await closeResource('HashResolver', () => hashResolver.close());
230241
processMetrics.stop();
231242
gcController.dispose();
232243

src/runtime/batch-runner.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import type { GcController } from "./gc-controller.ts";
1212
import type { RetryPolicy } from "./retry-policy.ts";
1313
import { SkipCounter, type SkipReason } from "../transform/skip-reasons.ts";
1414
import { transformBatch, type OutputRow } from "../transform/normalize.ts";
15+
import type { CollectionDefaults } from "../transform/hash-resolver.ts";
1516
import { type Cursor, deserializeCursor, serializeCursor } from "../types/cursor.ts";
1617

1718
// ---------------------------------------------------------------------------
@@ -30,6 +31,7 @@ export interface BatchRunnerConfig {
3031
database: string;
3132
table: string;
3233
snapshotInterval: number;
34+
collectionDefaults?: CollectionDefaults;
3335
}
3436

3537
export interface BatchRunnerDeps {
@@ -283,6 +285,7 @@ export class BatchRunner {
283285
const { rows, skippedSamples } = transformBatch(
284286
page.docs,
285287
this.skipCounter,
288+
this.deps.config.collectionDefaults,
286289
);
287290

288291
// Record skip samples in manifest
@@ -732,7 +735,7 @@ export class BatchRunner {
732735
);
733736

734737
// Step 2: Re-transform
735-
const { rows } = transformBatch(page.docs, this.skipCounter);
738+
const { rows } = transformBatch(page.docs, this.skipCounter, this.deps.config.collectionDefaults);
736739

737740
// Step 3: Recompute digest
738741
const newDigest = computePayloadDigest(rows);

src/runtime/collection-orchestrator.ts

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import type { RetryPolicy } from "./retry-policy.ts";
99
import { BatchRunner, type RunnerStatus, type BatchRunnerStats } from "./batch-runner.ts";
1010
import { resolveRun } from "./resolve-run.ts";
1111
import { discoverCollections } from "../source/discover-collections.ts";
12+
import type { HashResolver } from "../transform/hash-resolver.ts";
1213
import type { Config } from "../config/schema.ts";
1314

1415
// ---------------------------------------------------------------------------
@@ -21,6 +22,8 @@ export interface CollectionResult {
2122
runId: string;
2223
status: "completed" | "failed" | "skipped";
2324
error?: string;
25+
docsRead?: number;
26+
rowsInserted?: number;
2427
}
2528

2629
export interface OrchestratorResult {
@@ -48,6 +51,7 @@ export interface OrchestratorDeps {
4851
chPressure: ClickHousePressure;
4952
gcController: GcController;
5053
retryPolicy: RetryPolicy;
54+
hashResolver: HashResolver;
5155
logger: Logger;
5256
config: Config;
5357
}
@@ -62,6 +66,7 @@ export class CollectionOrchestrator {
6266

6367
private discoveredCollections: string[] = [];
6468
private results: CollectionResult[] = [];
69+
private estimatedCounts: Map<string, number> = new Map();
6570
private currentCollection: string | null = null;
6671
private currentRunId: string | null = null;
6772
private currentBatchRunner: BatchRunner | null = null;
@@ -221,6 +226,10 @@ export class CollectionOrchestrator {
221226
return this.currentBatchRunner?.getCurrentBatchSeq() ?? 0;
222227
}
223228

229+
getEstimatedCounts(): Map<string, number> {
230+
return new Map(this.estimatedCounts);
231+
}
232+
224233
// -----------------------------------------------------------------------
225234
// Private
226235
// -----------------------------------------------------------------------
@@ -237,6 +246,29 @@ export class CollectionOrchestrator {
237246
// Switch MongoReader to this collection (also ensures index)
238247
await mongoReader.switchCollection(collectionName);
239248

249+
// Get estimated document count for progress tracking
250+
const estimatedCount = await mongoReader.getEstimatedCount();
251+
this.estimatedCounts.set(collectionName, estimatedCount);
252+
this.logger.info({ collection: collectionName, estimatedCount }, "Estimated document count");
253+
254+
// Resolve collection defaults from hash (once per collection)
255+
const collectionDefaults = this.deps.hashResolver.resolveCollectionName(
256+
collectionName,
257+
config.source.collectionPrefix,
258+
);
259+
260+
if (collectionDefaults) {
261+
this.logger.info(
262+
{ collection: collectionName, appId: collectionDefaults.a, event: collectionDefaults.e },
263+
"Resolved collection hash defaults",
264+
);
265+
} else if (collectionName !== config.source.collectionPrefix) {
266+
this.logger.warn(
267+
{ collection: collectionName },
268+
"No hash match found for collection — documents missing a/e will be skipped",
269+
);
270+
}
271+
240272
// Create per-collection RedisHotState with isolated key prefix
241273
const redisClient = this.deps.redisState.getRedisClient();
242274
const collectionPrefix = `${config.state.redisKeyPrefix}:${collectionName}`;
@@ -291,6 +323,7 @@ export class CollectionOrchestrator {
291323
database: config.target.db,
292324
table: config.target.table,
293325
snapshotInterval: config.state.timelineSnapshotInterval,
326+
collectionDefaults: collectionDefaults ?? undefined,
294327
},
295328
});
296329

@@ -301,19 +334,30 @@ export class CollectionOrchestrator {
301334
await batchRunner.run();
302335

303336
const finalStatus = batchRunner.getStatus();
337+
const finalStats = batchRunner.getStats();
304338
if (finalStatus === "completed") {
305-
return { collection: collectionName, sourceNs, runId, status: "completed" };
339+
return {
340+
collection: collectionName, sourceNs, runId, status: "completed",
341+
docsRead: finalStats.totalDocsRead,
342+
rowsInserted: finalStats.totalRowsInserted,
343+
};
306344
}
307345
if (finalStatus === "stopped" || finalStatus === "stopping") {
308346
// Stopped by operator — treat as incomplete, not failed
309-
return { collection: collectionName, sourceNs, runId, status: "failed", error: "Stopped by operator" };
347+
return {
348+
collection: collectionName, sourceNs, runId, status: "failed", error: "Stopped by operator",
349+
docsRead: finalStats.totalDocsRead,
350+
rowsInserted: finalStats.totalRowsInserted,
351+
};
310352
}
311353
return {
312354
collection: collectionName,
313355
sourceNs,
314356
runId,
315357
status: "failed",
316358
error: `BatchRunner ended with status: ${finalStatus}`,
359+
docsRead: finalStats.totalDocsRead,
360+
rowsInserted: finalStats.totalRowsInserted,
317361
};
318362
} catch (err) {
319363
const error = err instanceof Error ? err.message : String(err);

0 commit comments

Comments
 (0)