Skip to content

Commit eb5e359

Browse files
committed
Improve progress calculations and merge cluster data for migration stats
- Updated `/stats` route to compute collection progress dynamically with cluster-wide data integration. - Enhanced completed collection recovery using Redis aggregates to include expired progress data. - Refactored progress tracking to handle local and merged cluster metrics for accurate reporting. - Improved `CollectionOrchestrator` by adding fallback logic for `batchSeq` initialization.
1 parent 065ad6e commit eb5e359

2 files changed

Lines changed: 26 additions & 7 deletions

File tree

src/http/stats-route.ts

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -119,12 +119,11 @@ export function registerStatsRoute(app: FastifyInstance, deps: StatsDeps): void
119119
const elapsedMs = batchStats?.elapsedMs ?? 0;
120120
const elapsedSec = elapsedMs / 1000;
121121

122-
// ── Progress calculations ───────────────────────────────────────────
122+
// ── Progress calculations (local only; cluster merge happens after clusterData fetch) ──
123123
const currentCollEstimate = progress.currentCollection
124124
? estimatedCounts.get(progress.currentCollection) ?? 0 : 0;
125-
const currentCollDocsRead = totalDocsRead;
126-
const currentCollPct = currentCollEstimate > 0
127-
? Math.min(100, Math.round((currentCollDocsRead / currentCollEstimate) * 100)) : 0;
125+
let currentCollDocsRead = totalDocsRead;
126+
let currentCollPct = 0;
128127

129128
// Overall progress — recomputed after mergedCollectionProgress is built
130129
const totalEstimated = Array.from(estimatedCounts.values()).reduce((a, b) => a + b, 0);
@@ -181,6 +180,19 @@ export function registerStatsRoute(app: FastifyInstance, deps: StatsDeps): void
181180
completedAggregates = await redisState.getAllCollectionCompleted();
182181
} catch { /* best-effort */ }
183182

183+
// ── Finalize current collection progress (merge cluster data) ──
184+
if (clusterData && progress.currentCollection) {
185+
const remoteProgress = clusterData.collectionProgress
186+
.filter(p => p.collectionName === progress.currentCollection);
187+
for (const rp of remoteProgress) {
188+
if (rp.podId !== config.worker.podId) {
189+
currentCollDocsRead += (rp.docsRead ?? 0);
190+
}
191+
}
192+
}
193+
currentCollPct = currentCollEstimate > 0
194+
? Math.min(100, Math.round((currentCollDocsRead / currentCollEstimate) * 100)) : 0;
195+
184196
// ── Sliding window throughput (best-effort) ──────────────────────
185197
let slidingThroughput: number | null = null;
186198
if (runId) {
@@ -527,8 +539,15 @@ export function registerStatsRoute(app: FastifyInstance, deps: StatsDeps): void
527539
const pending = mergedCollectionProgress.filter(c => c.status === 'pending').length;
528540
const total = progress.totalCollections || mergedCollectionProgress.length;
529541
const pct = total > 0 ? Math.min(100, Math.round((done / total) * 100)) : 0;
530-
const docsRead = mergedCollectionProgress.reduce((s, c) => s + (c.docsRead ?? 0), 0);
531-
const rowsInserted = mergedCollectionProgress.reduce((s, c) => s + (c.rowsInserted ?? 0), 0);
542+
let docsRead = mergedCollectionProgress.reduce((s, c) => s + (c.docsRead ?? 0), 0);
543+
let rowsInserted = mergedCollectionProgress.reduce((s, c) => s + (c.rowsInserted ?? 0), 0);
544+
// Include completed collections whose volatile progress:* TTL expired
545+
for (const [collection, agg] of completedAggregates) {
546+
if (!mergedCollectionProgress.some(c => c.collection === collection)) {
547+
docsRead += agg.docsRead;
548+
rowsInserted += agg.rowsInserted;
549+
}
550+
}
532551
const estimated = mergedCollectionProgress.reduce((s, c) => s + (c.estimated ?? 0), 0);
533552
return { total, done, failed, processing, pending, pct, docsRead, rowsInserted, estimated };
534553
})() : null,

src/runtime/collection-orchestrator.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,7 @@ export class CollectionOrchestrator {
406406
const elapsedSec = elapsedMs / 1000;
407407
return {
408408
status: "running",
409-
batchSeq: 0,
409+
batchSeq: active?.batchSeq ?? 0,
410410
lastCommittedId: null,
411411
totalDocsRead,
412412
totalRowsInserted,

0 commit comments

Comments
 (0)