Skip to content

Commit affef02

Browse files
committed
Add TTL tracking, danger zone controls, and cluster-wide progress enhancements
- Introduced TTL tracking for Redis locks with detailed expiration handling in `/viz` route. - Added "Danger Zone" actions for clearing MongoDB and Redis migration state, with confirmation prompts. - Enhanced `/stats` route to compute cluster-wide progress and ETA based on multi-pod data. - Enabled upsert functionality in MongoDB writes to better handle duplicates. - Improved run management by cleaning old run data before starting fresh. - Refined progress display, throughput calculations, and cluster-wide metrics visibility in multi-pod mode.
1 parent 8e3eac8 commit affef02

8 files changed

Lines changed: 201 additions & 26 deletions

File tree

src/http/control-route.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import type { MongoReader } from '../source/mongo-reader.ts';
66
import type { ClickHouseWriter } from '../target/clickhouse-writer.ts';
77
import type { GlobalProgress } from '../state/global-progress.ts';
88
import type { CollectionLock } from '../state/collection-lock.ts';
9+
import type { RedisHotState } from '../state/redis-hot-state.ts';
910
import { transformBatch } from '../transform/normalize.ts';
1011
import { SkipCounter } from '../transform/skip-reasons.ts';
1112
import { deserializeCursor } from '../types/cursor.ts';
@@ -28,6 +29,7 @@ export interface ControlDeps {
2829
chWriter: ClickHouseWriter;
2930
globalProgress?: GlobalProgress;
3031
collectionLock?: CollectionLock;
32+
redisState?: RedisHotState;
3133
}
3234

3335
interface GcRequestBody {
@@ -303,4 +305,56 @@ export function registerControlRoutes(app: FastifyInstance, deps: ControlDeps):
303305
orchestrator.stopAfterBatch();
304306
return reply.status(200).send({ ok: true, message: 'Drain initiated — finishing current batch then releasing locks' });
305307
});
308+
309+
// ── DANGER ZONE ────────────────────────────────────────────────────
310+
311+
// POST /control/danger/clear-mongodb - drop all migration state from MongoDB
312+
app.post('/control/danger/clear-mongodb', async (_request, reply) => {
313+
const { manifestStore: ms } = deps;
314+
try {
315+
const runs = await ms.listRuns({ limit: 1000 });
316+
let totalDeleted = 0;
317+
for (const run of runs.runs) {
318+
totalDeleted += await ms.deleteRunData(run.run_id);
319+
}
320+
// Also delete the run records themselves
321+
const db = (ms as any).client.db((ms as any).dbName);
322+
const runResult = await db.collection('mig_runs').deleteMany({});
323+
totalDeleted += runResult.deletedCount ?? 0;
324+
return reply.status(200).send({
325+
ok: true,
326+
message: `Cleared MongoDB migration state: ${totalDeleted} records deleted`,
327+
deletedRecords: totalDeleted,
328+
});
329+
} catch (err) {
330+
return reply.status(500).send({ ok: false, error: err instanceof Error ? err.message : String(err) });
331+
}
332+
});
333+
334+
// POST /control/danger/clear-redis - flush all migration keys from Redis
335+
app.post('/control/danger/clear-redis', async (_request, reply) => {
336+
const { redisState: rs } = deps;
337+
if (!rs) {
338+
return reply.status(400).send({ ok: false, error: 'Redis not available' });
339+
}
340+
try {
341+
const redis = rs.getRedisClient();
342+
const keys = await (async () => {
343+
const found: string[] = [];
344+
const stream = redis.scanStream({ match: 'mig:*', count: 500 });
345+
for await (const batch of stream) found.push(...(batch as string[]));
346+
return found;
347+
})();
348+
if (keys.length > 0) {
349+
await redis.unlink(...keys);
350+
}
351+
return reply.status(200).send({
352+
ok: true,
353+
message: `Cleared Redis migration state: ${keys.length} keys deleted`,
354+
deletedKeys: keys.length,
355+
});
356+
} catch (err) {
357+
return reply.status(500).send({ ok: false, error: err instanceof Error ? err.message : String(err) });
358+
}
359+
});
306360
}

src/http/stats-route.ts

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,8 @@ export function registerStatsRoute(app: FastifyInstance, deps: StatsDeps): void
133133
const overallPct = totalEstimated > 0
134134
? Math.min(100, Math.round((overallDocsRead / totalEstimated) * 100)) : 0;
135135

136-
// ETA based on elapsed time and progress percentage
137-
const etaMs = overallPct > 0 && overallPct < 100 && elapsedMs > 0
138-
? Math.round(((Date.now() - startedAt.getTime()) / overallPct) * (100 - overallPct))
139-
: null;
136+
// ETA computed later using cluster-wide data
137+
let etaMs: number | null = null;
140138

141139
// Commands from Redis (best-effort)
142140
let commands: CommandFlags = {};
@@ -240,20 +238,58 @@ export function registerStatsRoute(app: FastifyInstance, deps: StatsDeps): void
240238
};
241239
});
242240

241+
// ── Compute cluster-wide aggregates for summary ─────────────────
242+
const clusterDocsRead = clusterData
243+
? mergedCollectionProgress.reduce((s, c) => s + (c.docsRead ?? 0), 0)
244+
: overallDocsRead;
245+
const clusterEstimated = clusterData
246+
? mergedCollectionProgress.reduce((s, c) => s + (c.estimated ?? 0), 0) || totalEstimated
247+
: totalEstimated;
248+
const clusterPct = clusterEstimated > 0
249+
? Math.min(100, Math.round((clusterDocsRead / clusterEstimated) * 100))
250+
: overallPct;
251+
const clusterDone = clusterData
252+
? mergedCollectionProgress.filter(c => c.status === 'completed' || c.status === 'skipped').length
253+
: progress.completedCollections + progress.skippedCollections;
254+
const clusterFailed = clusterData
255+
? mergedCollectionProgress.filter(c => c.status === 'failed').length
256+
: progress.failedCollections;
257+
const clusterProcessing = clusterData
258+
? mergedCollectionProgress.filter(c => c.status === 'processing').length
259+
: (progress.currentCollection ? 1 : 0);
260+
const clusterTotal = clusterData
261+
? (progress.totalCollections || mergedCollectionProgress.length)
262+
: progress.totalCollections;
263+
264+
// ETA based on cluster-wide progress
265+
const serviceElapsedMs = Date.now() - startedAt.getTime();
266+
etaMs = clusterPct > 0 && clusterPct < 100 && serviceElapsedMs > 0
267+
? Math.round((serviceElapsedMs / clusterPct) * (100 - clusterPct))
268+
: null;
269+
243270
const payload = {
244-
// ── Quick-glance summary ──────────────────────────────────────────
271+
// ── Quick-glance summary (cluster-wide when multi-pod) ──────────
245272
summary: {
246-
overall: progressBar(overallPct),
247-
overallPct,
273+
overall: progressBar(clusterPct),
274+
overallPct: clusterPct,
248275
currentCollection: progress.currentCollection
249276
? `${progress.currentCollection} ${progressBar(currentCollPct)}`
250277
: 'idle',
251278
currentCollectionPct: currentCollPct,
252-
collections: `${progress.completedCollections}/${progress.totalCollections} done`
253-
+ (progress.failedCollections > 0 ? `, ${progress.failedCollections} failed` : '')
254-
+ (progress.skippedCollections > 0 ? `, ${progress.skippedCollections} skipped` : ''),
255-
docsProgress: `${fmtNum(overallDocsRead)} / ~${fmtNum(totalEstimated)} docs`,
256-
throughput: `${fmtNum(Math.round(batchStats?.docsPerSecond ?? 0))} docs/s`,
279+
collections: `${clusterDone}/${clusterTotal} done`
280+
+ (clusterFailed > 0 ? `, ${clusterFailed} failed` : '')
281+
+ (clusterProcessing > 0 ? `, ${clusterProcessing} processing` : ''),
282+
docsProgress: `${fmtNum(clusterDocsRead)} / ~${fmtNum(clusterEstimated)} docs`,
283+
throughput: (() => {
284+
// In multi-pod mode, compute cluster-wide throughput from total docs / uptime
285+
const clusterDocs = clusterData
286+
? mergedCollectionProgress.reduce((s, c) => s + (c.docsRead ?? 0), 0)
287+
: 0;
288+
if (clusterDocs > 0 && uptimeSec > 0) {
289+
return `${fmtNum(Math.round(clusterDocs / uptimeSec))} docs/s`;
290+
}
291+
return `${fmtNum(Math.round(batchStats?.docsPerSecond ?? 0))} docs/s`;
292+
})(),
257293
elapsed: (() => {
258294
const isTerminal = runnerStatus === 'completed' || runnerStatus === 'failed' || runnerStatus === 'stopped';
259295
if (isTerminal && frozenElapsedMs === null) {

src/http/viz-route.ts

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ const DASHBOARD_HTML = /* html */ `<!DOCTYPE html>
295295
<h3 style="margin-top:12px">Active Locks <span id="lock-count" style="font-weight:400;color:var(--text2)"></span></h3>
296296
<div class="coll-scroll" style="max-height:160px">
297297
<table class="coll-table">
298-
<thead><tr><th>Collection</th><th>Pod</th><th>Acquired</th><th>Action</th></tr></thead>
298+
<thead><tr><th>Collection</th><th>Pod</th><th>Acquired</th><th>Expires In</th><th>Action</th></tr></thead>
299299
<tbody id="lock-body"></tbody>
300300
</table>
301301
</div>
@@ -346,6 +346,17 @@ const DASHBOARD_HTML = /* html */ `<!DOCTYPE html>
346346
</div>
347347
</div>
348348
349+
<!-- Danger Zone -->
350+
<div class="card full" style="margin-top:24px;border-color:var(--red)">
351+
<h3 style="color:var(--red)">Danger Zone</h3>
352+
<p style="font-size:11px;color:var(--text2);margin-bottom:12px">These actions are irreversible and will destroy migration state. Use with caution.</p>
353+
<div style="display:flex;gap:12px;flex-wrap:wrap">
354+
<button class="btn" id="btn-clear-mongo" style="padding:6px 16px;font-size:12px;border-color:var(--red);color:var(--red)">Clear MongoDB State</button>
355+
<button class="btn" id="btn-clear-redis" style="padding:6px 16px;font-size:12px;border-color:var(--red);color:var(--red)">Clear Redis State</button>
356+
</div>
357+
<div id="danger-msg" style="margin-top:8px;font-size:11px"></div>
358+
</div>
359+
349360
<script>
350361
(function() {
351362
'use strict';
@@ -386,6 +397,30 @@ const DASHBOARD_HTML = /* html */ `<!DOCTYPE html>
386397
$('btn-global-resume').addEventListener('click', function() { doControl('global/resume'); });
387398
$('btn-global-stop').addEventListener('click', function() { doControl('global/stop'); });
388399
400+
// Danger zone buttons
401+
$('btn-clear-mongo').addEventListener('click', function() {
402+
if (!confirm('WARNING: This will permanently delete ALL migration runs, batches, and events from MongoDB. This cannot be undone. Continue?')) return;
403+
var msg = $('danger-msg');
404+
fetch('/control/danger/clear-mongodb', { method: 'POST' })
405+
.then(function(r) { return r.json(); })
406+
.then(function(d) {
407+
msg.textContent = d.ok ? 'MongoDB cleared: ' + d.deletedRecords + ' records deleted' : 'Error: ' + d.error;
408+
msg.style.color = d.ok ? 'var(--green)' : 'var(--red)';
409+
})
410+
.catch(function(e) { msg.textContent = 'Error: ' + e.message; msg.style.color = 'var(--red)'; });
411+
});
412+
$('btn-clear-redis').addEventListener('click', function() {
413+
if (!confirm('WARNING: This will permanently delete ALL migration keys from Redis (cursors, bitmaps, live state, locks, heartbeats). This cannot be undone. Continue?')) return;
414+
var msg = $('danger-msg');
415+
fetch('/control/danger/clear-redis', { method: 'POST' })
416+
.then(function(r) { return r.json(); })
417+
.then(function(d) {
418+
msg.textContent = d.ok ? 'Redis cleared: ' + d.deletedKeys + ' keys deleted' : 'Error: ' + d.error;
419+
msg.style.color = d.ok ? 'var(--green)' : 'var(--red)';
420+
})
421+
.catch(function(e) { msg.textContent = 'Error: ' + e.message; msg.style.color = 'var(--red)'; });
422+
});
423+
389424
function statusClass(s) {
390425
if (s === 'running' || s === 'stopping') return 'status-running';
391426
if (s === 'waiting_for_index') return 'status-waiting_for_index';
@@ -402,7 +437,6 @@ const DASHBOARD_HTML = /* html */ `<!DOCTYPE html>
402437
}
403438
404439
function truncHash(name) {
405-
if (name && name.length > 50) return name.slice(0, 20) + '...' + name.slice(-8);
406440
return name || '-';
407441
}
408442
@@ -421,6 +455,7 @@ const DASHBOARD_HTML = /* html */ `<!DOCTYPE html>
421455
filtered = cp;
422456
}
423457
458+
filtered.sort(function(a, b) { return (b.estimated || 0) - (a.estimated || 0); });
424459
if (filtered.length > 200) filtered = filtered.slice(0, 200);
425460
426461
var tbody = $('coll-body');
@@ -447,13 +482,9 @@ const DASHBOARD_HTML = /* html */ `<!DOCTYPE html>
447482
448483
// Collection name
449484
var td1 = document.createElement('td');
450-
td1.style.maxWidth = '200px';
451-
td1.style.overflow = 'hidden';
452-
td1.style.textOverflow = 'ellipsis';
453-
td1.style.whiteSpace = 'nowrap';
454485
td1.style.fontSize = '11px';
455-
td1.setAttribute('title', c.collection || '');
456-
td1.textContent = truncHash(c.collection);
486+
td1.style.wordBreak = 'break-all';
487+
td1.textContent = c.collection || '-';
457488
tr.appendChild(td1);
458489
459490
// Status tag
@@ -841,7 +872,7 @@ const DASHBOARD_HTML = /* html */ `<!DOCTYPE html>
841872
if (locks.length === 0) {
842873
var noLockRow = document.createElement('tr');
843874
var noLockCell = document.createElement('td');
844-
noLockCell.setAttribute('colspan', '4');
875+
noLockCell.setAttribute('colspan', '5');
845876
noLockCell.className = 'muted';
846877
noLockCell.style.textAlign = 'center';
847878
noLockCell.textContent = 'No active locks';
@@ -864,6 +895,20 @@ const DASHBOARD_HTML = /* html */ `<!DOCTYPE html>
864895
lkTd3.style.cssText = 'font-size:10px;color:var(--text2)';
865896
lkTd3.textContent = lk.acquiredAt ? new Date(lk.acquiredAt).toLocaleTimeString() : '-';
866897
lkRow.appendChild(lkTd3);
898+
var lkTdTtl = document.createElement('td');
899+
lkTdTtl.style.cssText = 'font-size:10px;color:var(--text2)';
900+
var ttl = lk.ttlSec;
901+
if (ttl > 0) {
902+
var ttlMin = Math.floor(ttl / 60);
903+
var ttlSec = ttl % 60;
904+
lkTdTtl.textContent = ttlMin > 0 ? ttlMin + 'm ' + ttlSec + 's' : ttlSec + 's';
905+
if (ttl < 60) lkTdTtl.style.color = 'var(--red)';
906+
else if (ttl < 120) lkTdTtl.style.color = 'var(--yellow)';
907+
} else {
908+
lkTdTtl.textContent = 'no TTL';
909+
lkTdTtl.style.color = 'var(--red)';
910+
}
911+
lkRow.appendChild(lkTdTtl);
867912
var lkTd4 = document.createElement('td');
868913
var relBtn = document.createElement('button');
869914
relBtn.className = 'btn';

src/main.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ async function main(): Promise<void> {
229229
chWriter,
230230
globalProgress,
231231
collectionLock,
232+
redisState,
232233
});
233234

234235
registerRunRoutes(app, {

src/runtime/resolve-run.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ export async function resolveRun(opts: ResolveRunOpts): Promise<ResolvedRun> {
111111
const upperBoundId = serializeCursor(upperBound);
112112
const activeRun = await manifestStore.getActiveRun(sourceNs, targetTable);
113113
if (activeRun) {
114+
// Clean old run data before starting fresh
115+
const deleted = await manifestStore.deleteRunData(activeRun.run_id);
116+
logger.info({ oldRunId: activeRun.run_id, deletedRecords: deleted }, 'Cleaned old run data for fresh start');
114117
await manifestStore.updateRunStatus(activeRun.run_id, 'completed');
115118
}
116119
await createNewRun(runId, upperBoundId);
@@ -123,6 +126,9 @@ export async function resolveRun(opts: ResolveRunOpts): Promise<ResolvedRun> {
123126
if (!activeRun) throw new Error('No existing run to clone from');
124127
const runId = randomUUID();
125128
const upperBoundId = activeRun.upper_bound_cursor;
129+
// Clean old run data before starting fresh
130+
const deleted = await manifestStore.deleteRunData(activeRun.run_id);
131+
logger.info({ oldRunId: activeRun.run_id, deletedRecords: deleted }, 'Cleaned old run data for fresh start');
126132
await manifestStore.updateRunStatus(activeRun.run_id, 'completed');
127133
await createNewRun(runId, upperBoundId);
128134
logger.info({ runId, upperBoundId, sourceNs, targetTable }, 'Created new migration run (clone-run mode)');

src/state/async-batch-writer.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ export class AsyncBatchWriter {
149149
}
150150

151151
try {
152-
// Bulk insert batch records
152+
// Bulk upsert batch records (handles duplicates via update)
153153
await this.manifestStore.bulkInsertBatches(batchDocs);
154154

155155
// Advance cursor to latest position
@@ -160,6 +160,7 @@ export class AsyncBatchWriter {
160160
"Async flush completed",
161161
);
162162
} catch (err) {
163+
const errMsg = err instanceof Error ? err.message : String(err);
163164
// Re-queue failed items (capped to prevent unbounded growth)
164165
const maxDepth = this.config.maxQueueDepth ?? 1000;
165166
const totalAfterRequeue = this.queue.length + items.length;
@@ -174,7 +175,7 @@ export class AsyncBatchWriter {
174175
);
175176
}
176177
this.logger.warn(
177-
{ error: err instanceof Error ? err.message : String(err), count: items.length, queueDepth: this.queue.length },
178+
{ error: errMsg, count: items.length, queueDepth: this.queue.length },
178179
"Async flush to MongoDB failed, re-queued",
179180
);
180181
} finally {

src/state/collection-lock.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ export interface LockInfo {
1111
collectionName: string;
1212
podId: string;
1313
acquiredAt: string;
14+
ttlSec: number;
1415
}
1516

1617
export interface CollectionLockConfig {
@@ -224,19 +225,28 @@ export class CollectionLock {
224225
}
225226
if (keys.length === 0) return [];
226227

227-
const values = await this.redis.mget(...keys);
228+
// Fetch values and TTLs in parallel via pipeline
229+
const pipeline = this.redis.pipeline();
230+
for (const key of keys) {
231+
pipeline.get(key);
232+
pipeline.ttl(key);
233+
}
234+
const results = await pipeline.exec();
235+
228236
const prefix = `${this.config.keyPrefix}:lock:`;
229237
const locks: LockInfo[] = [];
230238

231239
for (let i = 0; i < keys.length; i++) {
232-
const val = values[i];
240+
const val = results?.[i * 2]?.[1] as string | null;
241+
const ttl = results?.[i * 2 + 1]?.[1] as number ?? -1;
233242
if (!val) continue;
234243
try {
235244
const data = JSON.parse(val) as { podId: string; acquiredAt: string };
236245
locks.push({
237246
collectionName: keys[i].slice(prefix.length),
238247
podId: data.podId,
239248
acquiredAt: data.acquiredAt,
249+
ttlSec: ttl,
240250
});
241251
} catch {
242252
// skip malformed entries

0 commit comments

Comments
 (0)