Skip to content

Commit baf667e

Browse files
Merge pull request #2 from Countly/fix/deadlock
deadlock on collection pickup fix
2 parents 28f88e8 + 817c01b commit baf667e

6 files changed

Lines changed: 1524 additions & 57 deletions

File tree

src/runtime/range-coordinator.ts

Lines changed: 95 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -392,82 +392,124 @@ export class RangeCoordinator {
392392
// Try to become the coordinator via SETNX
393393
const acquired = await redis.set(this.initKey, config.podId, "EX", 60, "NX");
394394
if (!acquired) {
395+
// Check if the lock holder is still alive; reclaim if dead
396+
const lockHolder = await redis.get(this.initKey);
397+
if (lockHolder) {
398+
const podKeyPrefix = `${config.redisKeyPrefix}:pod:`;
399+
const holderAlive = await redis.exists(`${podKeyPrefix}${lockHolder}`);
400+
if (!holderAlive) {
401+
await redis.del(this.initKey);
402+
this.logger.info(
403+
{ stalePod: lockHolder },
404+
"Reclaimed stale range init lock from dead pod",
405+
);
406+
// Retry acquisition after reclaim
407+
return this.initRanges();
408+
}
409+
}
410+
395411
this.logger.info("Another pod is initializing ranges, waiting...");
396412
for (let i = 0; i < 30; i++) {
397413
await new Promise(r => setTimeout(r, 2_000));
398414
const rid = await redis.get(this.runIdKey);
399415
if (rid) return rid;
416+
417+
// Re-check pod liveness periodically to avoid waiting for a dead pod
418+
const currentHolder = await redis.get(this.initKey);
419+
if (currentHolder) {
420+
const podKeyPrefix = `${config.redisKeyPrefix}:pod:`;
421+
const stillAlive = await redis.exists(`${podKeyPrefix}${currentHolder}`);
422+
if (!stillAlive) {
423+
await redis.del(this.initKey);
424+
this.logger.info(
425+
{ stalePod: currentHolder },
426+
"Reclaimed stale range init lock from dead pod during wait",
427+
);
428+
return this.initRanges();
429+
}
430+
} else {
431+
// initKey expired (TTL), retry
432+
return this.initRanges();
433+
}
400434
}
401435
throw new Error("Timed out waiting for range initialization");
402436
}
403437

404-
// We are the coordinator — query min/max cd
405-
const lowerBound = await mongoReader.getLowerBound();
406-
const upperBound = await mongoReader.getUpperBound();
407-
if (!lowerBound || !upperBound) {
408-
const hasNullCd = await mongoReader.hasNullCdDocuments();
409-
if (!hasNullCd) {
410-
throw new Error("Collection is empty, cannot initialize ranges");
438+
// We are the coordinator — wrapped in try/catch to release initKey on failure
439+
try {
440+
const lowerBound = await mongoReader.getLowerBound();
441+
const upperBound = await mongoReader.getUpperBound();
442+
if (!lowerBound || !upperBound) {
443+
const hasNullCd = await mongoReader.hasNullCdDocuments();
444+
if (!hasNullCd) {
445+
throw new Error("Collection is empty, cannot initialize ranges");
446+
}
447+
const runId = randomUUID();
448+
const now = new Date().toISOString();
449+
await manifestStore.createRun({
450+
run_id: runId,
451+
status: "active",
452+
source_ns: config.sourceNs,
453+
target_table: config.targetTable,
454+
upper_bound_cursor: serializeCursor({ cd: 0, id: "\uffff".repeat(24) }),
455+
transform_version: config.transformVersion,
456+
created_at: now,
457+
});
458+
await redis.set(this.runIdKey, runId);
459+
await redis.del(this.initKey);
460+
this.logger.info({ runId }, "All-null collection — skipping cd-ranges, will sweep via null-cd phase");
461+
return runId;
411462
}
463+
464+
const minCd = lowerBound.cd;
465+
const maxCd = upperBound.cd;
466+
const rangeCount = config.rangeCount;
467+
const spanMs = maxCd - minCd;
468+
const stepMs = Math.max(1, Math.ceil(spanMs / rangeCount));
469+
470+
// Create shared run in ManifestStore
412471
const runId = randomUUID();
413472
const now = new Date().toISOString();
414473
await manifestStore.createRun({
415474
run_id: runId,
416475
status: "active",
417476
source_ns: config.sourceNs,
418477
target_table: config.targetTable,
419-
upper_bound_cursor: serializeCursor({ cd: 0, id: "\uffff".repeat(24) }),
478+
upper_bound_cursor: serializeCursor(upperBound),
420479
transform_version: config.transformVersion,
421480
created_at: now,
422481
});
423-
await redis.set(this.runIdKey, runId);
424-
this.logger.info({ runId }, "All-null collection — skipping cd-ranges, will sweep via null-cd phase");
425-
return runId;
426-
}
427482

428-
const minCd = lowerBound.cd;
429-
const maxCd = upperBound.cd;
430-
const rangeCount = config.rangeCount;
431-
const spanMs = maxCd - minCd;
432-
const stepMs = Math.max(1, Math.ceil(spanMs / rangeCount));
433-
434-
// Create shared run in ManifestStore
435-
const runId = randomUUID();
436-
const now = new Date().toISOString();
437-
await manifestStore.createRun({
438-
run_id: runId,
439-
status: "active",
440-
source_ns: config.sourceNs,
441-
target_table: config.targetTable,
442-
upper_bound_cursor: serializeCursor(upperBound),
443-
transform_version: config.transformVersion,
444-
created_at: now,
445-
});
483+
// Create all ranges in Redis (atomic pipeline)
484+
const pipeline = redis.multi();
485+
for (let i = 0; i < rangeCount; i++) {
486+
const startCd = minCd + (i * stepMs);
487+
const endCd = i === rangeCount - 1 ? maxCd : minCd + ((i + 1) * stepMs);
488+
const entry: RangeEntry = {
489+
idx: i,
490+
startCd,
491+
endCd,
492+
status: "pending",
493+
podId: null,
494+
claimedAt: null,
495+
};
496+
pipeline.hset(this.rangesKey, String(i), JSON.stringify(entry));
497+
}
498+
pipeline.set(this.runIdKey, runId);
499+
pipeline.set(this.metaKey, JSON.stringify({ minCd, maxCd, rangeCount, createdAt: now }));
500+
pipeline.del(this.initKey);
501+
await pipeline.exec();
446502

447-
// Create all ranges in Redis (atomic pipeline)
448-
const pipeline = redis.multi();
449-
for (let i = 0; i < rangeCount; i++) {
450-
const startCd = minCd + (i * stepMs);
451-
const endCd = i === rangeCount - 1 ? maxCd : minCd + ((i + 1) * stepMs);
452-
const entry: RangeEntry = {
453-
idx: i,
454-
startCd,
455-
endCd,
456-
status: "pending",
457-
podId: null,
458-
claimedAt: null,
459-
};
460-
pipeline.hset(this.rangesKey, String(i), JSON.stringify(entry));
503+
this.logger.info(
504+
{ runId, minCd: new Date(minCd).toISOString(), maxCd: new Date(maxCd).toISOString(), rangeCount, stepMs },
505+
"Ranges initialized",
506+
);
507+
return runId;
508+
} catch (err) {
509+
// Release initKey so other pods (or retries) aren't blocked
510+
await redis.del(this.initKey).catch(() => {});
511+
throw err;
461512
}
462-
pipeline.set(this.runIdKey, runId);
463-
pipeline.set(this.metaKey, JSON.stringify({ minCd, maxCd, rangeCount, createdAt: now }));
464-
await pipeline.exec();
465-
466-
this.logger.info(
467-
{ runId, minCd: new Date(minCd).toISOString(), maxCd: new Date(maxCd).toISOString(), rangeCount, stepMs },
468-
"Ranges initialized",
469-
);
470-
return runId;
471513
}
472514

473515
private async claimNextRange(): Promise<RangeEntry | null> {

tests/helpers/seed-mongo.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ export interface SeedOptions {
3131
createIndex?: boolean;
3232
/** Fraction of docs with cd set to null (0–1). Default 0. */
3333
nullCdFraction?: number;
34+
/** Fraction of docs with invalid ts (ts=0, will be skipped). Default 0. */
35+
invalidTsFraction?: number;
3436
}
3537

3638
// ---------------------------------------------------------------------------
@@ -123,18 +125,19 @@ export async function seedCollection(opts: SeedOptions): Promise<{
123125
const migrated = Math.random() < migratedFrac;
124126
const missingUid = !migrated && Math.random() < missingUidFrac;
125127
const nullCd = !migrated && !missingUid && Math.random() < (opts.nullCdFraction ?? 0);
128+
const invalidTs = !migrated && !missingUid && !nullCd && Math.random() < (opts.invalidTsFraction ?? 0);
126129

127130
docs.push(makeDoc({
128131
appId,
129132
eventName,
130-
ts,
133+
ts: invalidTs ? 0 : ts,
131134
cd: nullCd ? null as any : cd,
132135
idx: i,
133136
migrated,
134137
missingUid,
135138
}));
136139

137-
if (!migrated && !missingUid) {
140+
if (!migrated && !missingUid && !invalidTs) {
138141
expectedRows++;
139142
}
140143
}

tests/integration/cursor-isolation.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ describe("cursor-isolation (range-parallel)", () => {
354354
const range3EndDate = new Date(range3EndMs + 1000).toISOString().replace("T", " ").replace("Z", "");
355355

356356
const rowsInRange = await chRowCount(
357-
`cd >= '${range3StartDate}' AND cd <= '${range3EndDate}'`,
357+
`ts >= '${range3StartDate}' AND ts <= '${range3EndDate}'`,
358358
);
359359

360360
// All rows should be within range 3's window (the runner should not have
@@ -482,7 +482,7 @@ describe("cursor-isolation (range-parallel)", () => {
482482
.toISOString().replace("T", " ").replace("Z", "");
483483

484484
const rowsInRange = await chRowCount(
485-
`cd >= '${range1StartDate}' AND cd <= '${range1EndDate}'`,
485+
`ts >= '${range1StartDate}' AND ts <= '${range1EndDate}'`,
486486
);
487487
expect(rowsInRange).toBe(totalRows);
488488
});

0 commit comments

Comments
 (0)