Boundary 2 (Shard Coordination Protocol) manages distributed shard lifecycle,
lease-based ownership, and bounded idempotent operations for the gossip-rs
secret scanner. The shared data model (shard spec, cursor, pooled wrappers,
manifest validation, split-replace planning core) lives in
crates/gossip-contracts/src/coordination/, and the protocol layer
(traits, state machine, InMemoryCoordinator, sim harness, async trait
variants) lives in crates/gossip-coordination/src/. Both depend on Boundary 1
(Identity & Hashing Spine) for TenantId, ShardId, RunId, OpId,
FenceEpoch, LogicalTime, and CanonicalBytes.
The durable backend lives in crates/gossip-coordination-etcd/.
It provides two entrypoints: EtcdCoordinator (sync wrapper owning a
single-threaded Tokio runtime) and AsyncEtcdCoordinator (async core
for callers that already have a runtime). Both share the same coordination
logic, static helpers, and validation code.
The backend owns a real etcd client connection, a deterministic keyspace
layout with typed exact-key wrappers (keyspace.rs), and an explicit binary
codec (codec.rs) for
persisting coordination records. It persists create_run, register_shards,
get_run* queries, claim_next_available, and the fenced
acquire_and_restore_into / renew / checkpoint / split_replace /
split_residual hot path directly in etcd.
Shard owner bindings live in separate etcd lease-backed keys so storage-layer
liveness and logical lease deadlines are checked together before accepting
progress updates. split_replace also enforces a backend-local
max_children_per_op cap so one atomic child publication stays bounded.
Each split_replace transaction uses 4 + 2N writes and 5 + N compares
(total 9 + 3N txn-ops, where N = children). The default cap of 8 yields
33 total ops; operators raising the cap must verify the total stays within
the cluster's --max-txn-ops budget (etcd default 128).
The etcd config also carries the same per-tenant and global shard ceilings
used by the in-memory backend. The persisted backend enforces those limits
before register_shards, split_replace, and split_residual by counting
existing shard-record keys under the relevant etcd prefixes, then rejecting
growth that would exceed the configured caps.
The remaining mutating shard-lifecycle operations (complete, park_shard)
are fully persisted with terminal-state transitions, lease cleanup, and
idempotent replay, matching the same CAS-guarded pattern used by the other
shard operations. All RunManagement
(and AsyncRunManagement) trait operations (run terminal transitions,
unpark_shard) are fully persisted. The etcd backend also exposes
backend-specific helpers for active-run listing and GC of stale initializing
runs (see backend.rs file table below); these are inherent methods on
EtcdCoordinator, not part of the trait surfaces.
The module provides seven core capabilities:
- Shard lifecycle state machine -- a four-state automaton (
Active,Done,Split,Parked) with terminal state enforcement and compile-time discriminant stability. - Lease-based ownership with fencing tokens -- time-bounded exclusive
access enforced by monotonic
FenceEpochvalues that reject zombie workers (Kleppmann 2016). - Bounded idempotency -- a 16-entry FIFO op-log backed by
RingBufferthat caches operation fingerprints for replay detection, inspired by Stripe's idempotency key pattern. - Deterministic split operations -- two strategies (replace and residual) that produce child shard IDs via domain-separated BLAKE3, with full coverage validation.
- Run-level management -- two-phase creation (
create_runthenregister_shards), progress aggregation, and terminal transitions with their own bounded op-log. - Tiger-style invariant enforcement -- crash-to-prevent-corruption philosophy where violated invariants panic before persistence, ensuring crash-recovery returns to the last valid state.
- Source-family handoff boundary -- coordination emits shard claims,
key-range state, and cursors that runtime layers route through
gossip-contracts::connectorandgossip-scanner-runtimeso CLI and distributed execution share one family-oriented orchestration surface.
gossip-coordination crate (crates/gossip-coordination/src/, excluding *_tests.rs except the intentionally documented scenario_tests.rs and test_fixtures.rs; src/sim/ is covered in simulation-harness.md)
| File | Role |
|---|---|
traits.rs |
CoordinationBackend and AsyncCoordinationBackend traits -- the semantic contract for all backends (sync and async variants) |
record.rs |
ShardRecord, ShardStatus, ParkReason, ShardSnapshotView |
run.rs |
RunRecord, RunStatus, RunConfig, RunManagement and AsyncRunManagement traits |
split_execution.rs |
Coordination-owned split execution helpers: derived shard IDs, payload hashing, result types |
in_memory.rs |
In-memory reference implementation (executable spec) |
lease.rs |
Lease, LeaseHolder, OpLogEntry, OpKind, OpResult |
error.rs |
Shared CoordError, InfraError (typed infrastructure errors), and IdempotentOutcome |
run_errors.rs |
Run-management error types |
validation.rs |
validate_lease, validate_cursor_update_pooled, check_op_idempotency |
shard_limits.rs |
Shared shard-count limit validation helpers and ShardLimitViolation snapshots used by both backends |
events.rs |
EventCollector, EventKind, StateTransitionEvent |
facade.rs |
CoordinationFacade, ShardClaiming, ClaimError |
session.rs |
WorkerSession ergonomic wrapper with move/borrow lifecycle |
conformance.rs |
Feature-gated backend-agnostic coordination conformance harness for SimulationBackend impls |
scenario_tests.rs |
Multi-step coordination workflow tests covering realistic lease/run stories |
test_fixtures.rs |
Shared seeded coordinators, canonical plans, and helper constructors used by coordination tests |
lib.rs |
Module root and public re-exports |
| File | Role |
|---|---|
mod.rs |
Module root, re-exports, and boundary ownership map |
cursor.rs |
Cursor type with monotonicity and bounds semantics |
shard_spec.rs |
ShardSpec key-range type, CursorSemantics, split validation |
pooled.rs |
PooledShardSpec, PooledCursor — arena-pooled byte-field wrappers |
restored_state.rs |
RestoredShardState — grouped acquire/restore payload handed to runtime layers |
limits.rs |
Capacity constants for split fan-out (MAX_SPLIT_CHILDREN, MAX_SPAWNED_PER_SHARD) |
manifest.rs |
InitialShardInput, validate_manifest — shard manifest validation for register_shards |
split.rs |
Contracts-owned split planner core (SplitReplacePlan, SplitResidualPlan, plan_split_replace*, plan_split_residual*) |
| File | Role |
|---|---|
lib.rs |
Module root and public re-exports |
behavioral_conformance.rs |
Real-etcd behavioral conformance scenarios that mirror the shared coordination harness using protocol operations plus persisted read-back oracles |
backend.rs |
Module root for the backend/ directory; shared free functions (key comparison, CAS delay, shard capacity counting) |
backend/coordinator.rs |
EtcdCoordinator (sync wrapper with owned Tokio runtime) and AsyncEtcdCoordinator (async core): shared low-level etcd RPC wrappers, CAS retry logic, data-access helpers (load/scan run and shard records), and inherent methods (list_active_runs_into, gc_stale_initializing_runs_into) |
backend/run_management.rs |
RunManagement and AsyncRunManagement impl: create/register/terminate runs, unpark shards, claim candidates |
backend/shard_coordination.rs |
CoordinationBackend and AsyncCoordinationBackend impl: shard acquisition, lease renewal, checkpointing, terminal transitions (complete, park_shard), and split operations |
backend/test_support.rs |
Feature-gated seeding, inspection, snapshot, and deterministic split fault-injection helpers for etcd integration tests |
sim_etcd_kv.rs |
Feature-gated in-memory model of the etcd KV subset used by coordination: point reads, prefix scans, CAS transactions, revision metadata, and lease-backed expiry for deterministic simulation |
config.rs |
Endpoint + namespace validation plus owner-lease TTL, optimistic retry tuning, shard count limits, and bounded split fanout |
keyspace.rs |
Deterministic ASCII etcd path construction plus typed exact-key wrappers for run/shard records, ownership, and active indexes |
codec.rs |
Explicit binary encoding/decoding for coordination records and shard-owner bindings persisted to etcd |
codec_tests.rs |
Round-trip, rejection, and proptest coverage for the binary codec |
error.rs |
etcd connection, codec, lease, and transaction error surfaces |
sim_coordinator.rs |
Feature-gated deterministic simulation adapter that runs the sync etcd backend over the in-memory KV model |
test_support.rs |
Testcontainers-backed lifecycle helpers plus namespace-isolated coordinator builders for integration tests |
tests.rs |
Config validation, keyspace path invariants (proptest), and ignored etcd integration tests covering acquire/checkpoint/renew/split lifecycle, unpark, collision/limit paths, fault-injected split atomicity, lease expiry, and contention |
┌──────────────┐
│ Active │
└──┬───┬───┬──┘
│ │ │
Complete │ │ │ Park
┌────────┘ │ └────────┐
▼ │ ▼
┌────────┐ SplitReplace ┌────────┐
│ Done │ │ │ Parked │
└────────┘ ▼ └────────┘
┌────────┐
│ Split │
└────────┘
All transitions originate from Active. Done, Split, and Parked are
terminal within the coordination protocol -- once a shard reaches one of
these states, no CoordinationBackend operation may change its status. The
method ShardRecord::assert_transition_legal panics on any attempt to
transition from a terminal state to a different state.
Parked has one escape hatch: unpark_shard (via the RunManagement trait,
not CoordinationBackend) transitions Parked back to Active and bumps
the fence epoch to invalidate any zombie workers from a prior lease.
split_residual is special: it shrinks the parent's key range and spawns a
residual child shard, but the parent stays Active. It is the only split
strategy that does not retire the parent.
ShardStatus uses #[repr(u8)] with stable discriminants that are persisted
to durable storage. Compile-time assertions enforce the mapping:
| Variant | Discriminant | Terminal? |
|---|---|---|
Active |
0 | No |
Done |
1 | Yes |
Split |
2 | Yes |
Parked |
3 | Yes |
When a shard is parked, a ParkReason (#[repr(u8)]) is stored alongside:
| Variant | Discriminant | Meaning |
|---|---|---|
PermissionDenied |
0 | Connector lacks access -- requires credential rotation |
NotFound |
1 | Scan target no longer exists |
Poisoned |
2 | Internal inconsistency -- requires manual investigation |
TooManyErrors |
3 | Transient errors accumulated -- suitable for auto-retry |
Other |
4 | Catch-all; backend should log additional context separately |
The invariant park_reason.is_some() iff status == Parked is asserted on
every state transition via assert_invariants.
The coordination protocol uses bounded idempotency -- a short-term operation cache that allows workers to safely retry RPCs without causing duplicate side effects.
Each shard carries a 16-entry FIFO op-log implemented as
RingBuffer<OpLogEntry, 16>. Each entry stores five fields:
(OpId, OpKind, OpResult, payload_hash: u64, executed_at: LogicalTime)
OpId: CSPRNG-generated idempotency key (unique per operation)OpKind: discriminant identifying the operation type (Checkpoint,Complete,Park,SplitReplace,SplitResidual,Unpark)OpResult: stored outcome (Completed,Error,Superseded)payload_hash: BLAKE3 fingerprint of the operation's parametersexecuted_at: logical timestamp of first execution
When an operation arrives with (op_id, payload_hash):
- Same
(op_id, payload_hash)-- the op-log entry matches. Return the cached result asIdempotentOutcome::Replayed. No mutation occurs. - Same
op_id, differentpayload_hash-- the caller reused an OpId with different parameters. ReturnOpIdConflicterror. This is always a client bug. - New
op_id(not in the op-log) -- execute the operation, record the result in the op-log, returnIdempotentOutcome::Executed.
Idempotency is checked before lease validation on every idempotent path, so that replays succeed even after the lease has expired or the shard has reached a terminal status.
When the op-log reaches capacity (16 entries), the oldest entry is evicted via
O(1) ring buffer overwrite (push_back_overwrite). After eviction, the
coordinator can no longer detect a retry for that OpId and treats it as a
new operation.
Post-eviction re-execution is safe because:
- Staleness guarantee -- eviction implies the
OpIdis at least 16 operations old, well past any reasonable retry window. - Convergent state transitions -- re-executing a shard operation either
converges to the same terminal state or is rejected by status guards
(e.g., completing an already-
Doneshard). FenceEpochis the primary zombie defense -- the op-log is a secondary defense for in-lease retries only. A stale worker from a prior lease epoch is fenced out by epoch comparison before the op-log is ever consulted.
Payload hashes use domain-separated BLAKE3 via the OP_PAYLOAD_V1 domain
constant, with per-operation tags prepended as a second domain-separation
layer:
| Operation | Tag bytes | Hashed fields |
|---|---|---|
checkpoint |
b"checkpoint" |
Cursor (via CanonicalBytes) |
complete |
b"complete" |
Cursor (via CanonicalBytes) |
park |
b"park" |
ParkReason (via CanonicalBytes) |
split_replace |
b"split_replace" |
SplitReplacePlan (length-prefixed children) |
split_residual |
b"split_residual" |
SplitResidualPlan (parent + residual specs) |
The per-operation tag ensures that a checkpoint and a complete with the same cursor produce different hashes.
| Data Structure | Rejection Reason |
|---|---|
Vec |
O(n) eviction via remove(0) -- slides all elements |
VecDeque |
Heap-allocated -- violates zero-alloc hot-path requirement |
ArrayVec |
No ring semantics -- eviction requires manual shift |
Option<T> array |
25% memory overhead for empty slots; manual index bookkeeping |
RingBuffer provides O(1) push/evict, stack-allocated [MaybeUninit<T>; N]
storage, and power-of-2 bitwise index calculation.
Initializing ──register_shards──→ Active
│ │
│ cancel_run ┌────────┼────────┐
▼ complete fail_run cancel
Cancelled │ │ │
▼ ▼ ▼
Done Failed Cancelled
A "run" is a single scan invocation -- it groups a set of shards that collectively cover the target data source. The coordinator tracks run status, validates shard manifests, and provides progress aggregation.
RunStatus uses #[repr(u8)] with stable discriminants:
| Variant | Discriminant | Terminal? |
|---|---|---|
Initializing |
0 | No |
Active |
1 | No |
Done |
2 | Yes |
Failed |
3 | Yes |
Cancelled |
4 | Yes |
Run creation is a two-phase process (D2.20):
create_run(tenant, run_id, config)-- creates aRunRecordinInitializingstatus with no shards. TheRunConfigcarriescursor_semantics,lease_duration, andmax_shard_retries.register_shards(tenant, run_id, shards, op_id)-- validates the shard manifest (uniqueness, non-empty, bounded key ranges), createsShardRecordentries, and transitions the run toActive. This step is idempotent viaOpId.
RunRecord is the coordinator's authoritative record for a run. It contains:
- Identity:
tenant,run(RunId) - Configuration:
config(RunConfig) - Lifecycle:
status(RunStatus),created_at,completed_at - Shard manifest:
root_shards(Vec of ShardId) - Idempotency:
op_log(RingBuffer of RunOpLogEntry, cap: 8)
RunManagement is a separate trait from CoordinationBackend (D2.22). It
defines run-level and admin operations. AsyncRunManagement provides an
identical async counterpart for I/O-bound backends:
| Method | Description | Idempotent? | Lease-gated? |
|---|---|---|---|
create_run |
Create run in Initializing status | No | No |
register_shards |
Populate shards, transition to Active | Yes (OpId) | No |
get_run |
Return run record (read-only) | N/A | No |
get_run_progress |
Aggregate shard status counts | N/A | No |
list_shards_into |
Return filtered shard summaries | N/A | No |
complete_run |
Transition Active to Done | Yes (OpId) | No |
fail_run |
Transition Active to Failed | Yes (OpId) | No |
cancel_run |
Transition non-terminal to Cancelled | Yes (OpId) | No |
unpark_shard |
Resume Parked shard to Active | Yes (OpId) | No |
create_run_with_shards |
Convenience: create + register | Yes (OpId) | No |
collect_claim_candidates_into |
Hot-path claim candidate scan | N/A | No |
Admin operations (unpark, cancel) are not lease-gated (D2.21) -- they
are coordinator-level actions that do not require a worker to hold a lease.
When a shard's key range becomes too large or unevenly distributed, the coordinator splits it into smaller shards. Two strategies exist:
The parent is retired (status transitions to Split) and replaced by >= 2
children that collectively cover the parent's entire key range.
Execution phases:
- Validate -- idempotency check, lease validation, full-coverage
validation via
validate_split_coverage, spawn-cap guard. - Build -- derive deterministic child IDs via
derive_split_shard_idwithDerivedShardKind::Childand sequential indices starting atparent.spawned.len(). Construct childShardRecordentries. - Apply -- transition parent to
Split(terminal), insert children into the map, update run-shard index.
After split-replace, the parent is terminal. No further operations can push op-log entries, so the split-replace op-log entry is never evicted -- guaranteeing idempotent replay detection forever.
The parent shrinks its key range (stays Active) and a new residual shard
covers the remainder. The parent keeps its lease and continues processing.
Execution phases:
- Validate -- idempotency (two-tier: op-log primary,
spawnedprobe as defense-in-depth), lease validation, coverage validation viavalidate_residual_split, cursor-bounds check (parent's cursor must remain within shrunk range), spawn-cap guard. - Build -- derive residual ID via
derive_split_shard_idwithDerivedShardKind::Residual. Residual starts withCursor::initial(). - Apply -- update parent's spec and
spawnedlist, insert residual into map, update run-shard index. Parent keeps its lease.
Because the parent stays Active, subsequent operations can evict the
split-residual op-log entry. The find_replayed_residual function provides
a secondary replay detection path by scanning parent.spawned for a
residual derived from the same OpId.
Child and residual shard IDs are derived deterministically via
derive_split_shard_id, which uses domain-separated BLAKE3 (SPLIT_ID_V1)
with five inputs: (run, parent_shard, op_id, kind, index). The output has
bit 63 set to distinguish derived shards from externally-assigned root
shards.
Birthday collision bound: ~2^31.5 values before 50% collision probability (63 effective bits). Acceptable for coordination use cases where the total number of derived shards per run is bounded.
| Constant | Value | Purpose |
|---|---|---|
MAX_SPLIT_CHILDREN |
256 | Maximum children in a single split-replace |
MAX_SPAWNED_PER_SHARD |
1024 | Maximum cumulative children + residuals per parent |
Compile-time assertion: MAX_SPLIT_CHILDREN <= MAX_SPAWNED_PER_SHARD.
Both split operations use the remove-mutate-restore pattern: the parent
record is temporarily removed from the HashMap, mutated inside a closure,
then restored on both success and failure paths. This avoids holding a
&mut ShardRecord (from get_mut) while also inserting new child entries
into the same HashMap. If the closure panics (invariant violation), the
parent is intentionally not restored -- an invariant panic indicates
irrecoverable corruption.
A Lease is the capability returned by acquire_and_restore_into and required by
all lease-gated mutations. It contains:
tenant: TenantId-- tenant isolation scoperun: RunId,shard: ShardId-- shard identityowner: WorkerId-- the worker holding the leasefence: FenceEpoch-- monotonically increasing epoch (fencing token)deadline: LogicalTime-- expiry time
Fields are private -- the coordinator constructs leases and workers read them via accessors.
Every acquire_and_restore_into increments the shard's fence_epoch. Subsequent
mutations must present a lease whose fence matches the record's current
epoch. If the epoch does not match, the operation is rejected with
StaleFence -- the worker is a zombie from a prior ownership period.
This is the "fencing token protocol" from Kleppmann (2016): all writes carry the fence epoch, and the backend rejects stale epochs.
validate_lease checks preconditions in priority order to prevent information
leakage:
- Tenant isolation (SEC-1) -- wrong-tenant requests are rejected before any internal state is revealed
- Terminal status -- fast rejection of dead shards
- Fence epoch -- zombie fencing
- Lease expiry -- time-based rejection (
now >= deadline) - Owner divergence -- catches identity mismatches when epochs agree
LeaseHolder bundles owner: WorkerId and deadline: LogicalTime into a
single value so that ShardRecord can store Option<LeaseHolder> instead of
two separate Option fields -- making the "both-present-or-both-absent"
invariant structurally impossible to violate.
The coordination protocol follows the crash-to-prevent-corruption philosophy: a violated invariant panics immediately, the operation is NOT persisted, and on crash-recovery the shard returns to its pre-operation state.
Every mutation path calls ShardRecord::assert_invariants() before returning.
RunRecord::assert_invariants() performs analogous checks at the run level.
| # | Invariant | Check |
|---|---|---|
| 1 | park_reason.is_some() iff status == Parked |
Status-reason consistency |
| 2 | (structural) | Option<LeaseHolder> makes paired-ness implicit |
| 3 | Terminal shards must not hold a lease | status.is_terminal() implies lease.is_none() |
| 4 | fence_epoch >= FenceEpoch::INITIAL |
Fence epoch minimum (>= 1) |
| 5 | op_log.len() <= OP_LOG_CAP |
Op-log bounded (defense-in-depth; RingBuffer enforces structurally) |
| 6 | status == Split implies !spawned.is_empty() |
Split shards must have children |
| 7 | parent.is_some() iff shard.is_derived() |
Biconditional: bit 63 set iff has parent |
| 8 | All entries in spawned satisfy is_derived() == true |
Spawned children must be derived |
| 9 | Op-log entries have unique OpId values |
No duplicate idempotency keys |
| 10 | spawned.len() <= MAX_SPAWNED_PER_SHARD |
Spawned count bounded at 1024 |
INV-9 is O(n^2) where n <= 16 (at most 120 comparisons) -- dominated by the per-transition persistence cost.
The core safety properties (mutual exclusion, zombie rejection, fence monotonicity, terminal irreversibility, split atomicity, and cursor monotonicity) are also verified exhaustively by the TLA+ model checker across all reachable states. See specs/coordination/README.md for the specification, property mapping, and instructions for running TLC.
Invariant panics should be treated as critical bugs. Monitor for coordinator process crashes and alert immediately. The shard's durable state is safe (the failing operation was not persisted), but the root cause must be investigated.
The core trait with 7 operations that every backend (in-memory, FoundationDB,
PostgreSQL, deterministic simulator) must implement. An async counterpart,
AsyncCoordinationBackend, provides identical semantics with async fn
methods for I/O-bound backends (etcd, FoundationDB, PostgreSQL) that benefit
from non-blocking execution:
| Operation | Signature (simplified) | Terminal? | Idempotent? | Lease-gated? |
|---|---|---|---|---|
acquire_and_restore_into |
(now, tenant, key, worker, &mut AcquireScratch) -> Result<AcquireResultView, AcquireError> |
No | No | No |
renew |
(now, tenant, lease) -> Result<RenewResult, RenewError> |
No | No | Yes |
checkpoint |
(now, tenant, lease, cursor, op_id) -> () |
No | Yes (OpId) | Yes |
complete |
(now, tenant, lease, cursor, op_id) -> () |
Yes (Done) | Yes (OpId) | Yes |
park_shard |
(now, tenant, lease, reason, op_id) -> () |
Yes (Parked) | Yes (OpId) | Yes |
split_replace |
(now, tenant, lease, plan, op_id) -> child_ids |
Yes (Split) | Yes (OpId) | Yes |
split_residual |
(now, tenant, lease, plan, op_id) -> residual_id |
No | Yes (OpId) | Yes |
acquire_and_restore_into -- the entry point for a worker to start or resume
scanning. Verifies the shard is Active and unleased (or lease expired),
increments fence_epoch, grants a new lease, and populates a caller-owned
AcquireScratch buffer with the shard's last checkpointed cursor. Returns an
AcquireResultView containing lease, snapshot, and capacity: CapacityHint.
The scratch parameter is a key zero-alloc design choice (see allocation policy).
NOT idempotent: each successful call increments the fence epoch.
renew -- extends the lease deadline without modifying shard progress.
Returns RenewResult containing new_deadline and capacity: CapacityHint.
The fence epoch does NOT change. Not idempotent via OpId; duplicate calls
simply extend the deadline further (harmless).
checkpoint -- persists a new cursor position (idempotent). Validates
cursor monotonicity (new.last_key >= old.last_key lexicographic) and
cursor bounds (last_key within [spec.start, spec.end)).
complete -- marks the shard as successfully done (terminal). Records a
final cursor, releases the lease, and transitions to Done.
park_shard -- halts the shard due to an error condition (terminal).
Records a ParkReason, releases the lease, and transitions to Parked.
split_replace -- replaces the parent with N child shards (terminal for
parent). Validates split coverage (children must exactly partition the
parent's key range). Children inherit cursor_semantics from the parent.
split_residual -- shrinks the parent and spawns a residual (non-terminal
for parent). Validates that the new parent range + residual range partition
the old range, and that the parent's cursor remains within the shrunk range.
Every backend implementation must maintain these invariants:
- Tenant isolation -- a request scoped to tenant A must never read or write shard records belonging to tenant B.
- Fence epoch monotonicity --
fence_epochis monotonically non-decreasing per shard; it increments on every ownership transfer. - Idempotency -- for any operation with an OpId: same
(op_id, payload_hash)returns cached result; sameop_idwith different hash returnsOpIdConflict; newop_idexecutes fresh. - Cursor monotonicity -- across checkpoints within the same lease epoch,
cursor.last_keymust be lexicographically non-decreasing. - Cursor bounds --
cursor.last_keymust fall within the shard's[spec.start, spec.end). - Split coverage -- split children must exactly partition the parent's key range (no gaps, no overlaps).
- Terminal irreversibility -- once a shard reaches Done, Split, or Parked, no protocol operation changes its status.
The coordination layer uses CursorUpdate<'a> — a borrowed cursor view:
┌──────────────────────────────────────────────────────┐
│ last_key: Option<&'a [u8]> │
│ → coordinator-visible, lex-comparable │
│ → represents the last item key fully processed │
├──────────────────────────────────────────────────────┤
│ token: Option<&'a [u8]> │
│ → connector-opaque resume state │
│ → pagination cursor, continuation token, etc. │
└──────────────────────────────────────────────────────┘
CursorUpdate<'a> is a borrowed view used across CoordinationBackend
trait methods. Internally, the in-memory coordinator stores cursor fields as
PooledCursor — a pair of Option<ByteSlot> handles into a shared
ByteSlab — eliminating per-checkpoint heap allocations on the hot path.
The PooledCursor::from_update and PooledCursor::last_key(slab) methods
convert between the pooled representation and borrowed views at API
boundaries. The connector boundary uses the owned connector::Cursor type,
which converts to/from CursorUpdate.
| old.last_key | new.last_key | Verdict |
|---|---|---|
None |
None |
OK -- no-op checkpoint |
None |
Some(k) |
OK -- first progress |
Some(a) |
Some(b) where b >= a |
OK -- forward progress |
Some(a) |
Some(a) |
OK -- idempotent retry |
Some(_) |
None |
REJECT -- reset to none |
Some(a) |
Some(b) where b < a |
REJECT -- regression |
A per-run configuration (#[repr(u8)]) that controls when cursor advancement
counts as committed progress:
Completed(0) -- strongest guarantee. The cursor only advances after all work up to that point is fully processed and results are durable.Dispatched(1) -- weaker but higher throughput. The cursor advances after work is durably dispatched but not necessarily fully processed.
The coordinator enforces monotonicity and bounds identically under both semantics.
ShardSpec defines a shard's key range as a half-open interval
[start, end) in lexicographic byte order, plus opaque connector metadata.
Empty start ([]) means "start of keyspace"; empty end ([]) means
"end of keyspace" (unbounded).
Reference: Bigtable (Chang et al., OSDI 2006), Spanner (Corbett et al.,
OSDI 2012), CockroachDB, FoundationDB (Zhou et al., SIGMOD 2021) -- all use
half-open [start, end) byte-key ranges.
| ID | Decision |
|---|---|
| D2.1 | Two-layer (last_key, token) cursor structure |
| D2.2 | ShardSpec has half-open key range [start, end) with lex-ordered byte boundaries |
| D2.3 | Cursor monotonicity is a hard safety invariant |
| D2.4 | Cursor bounds checking is a hard safety invariant |
| D2.5 | A checkpoint requires a last_key |
| D2.6 | ShardStatus: exactly 4 states (Active, Done, Split, Parked) |
| D2.7 | park_reason.is_some() iff status == Parked |
| D2.8 | Payload hashes use domain-separated BLAKE3 with CanonicalBytes |
| D2.9 | AcquireScratch + AcquireResultView enable zero-alloc acquire path |
| D2.10 | Derived shard IDs have bit 63 set |
| D2.11 | ShardRecord is self-contained (no back-references to RunConfig) |
| D2.12 | ShardSnapshotView includes status, spec, cursor, cursor_semantics, parent, spawned; excludes lease, fence, op_log, tenant, park_reason |
| D2.13 | Sync trait is the primary contract (Result, not futures); AsyncCoordinationBackend and AsyncRunManagement provide async counterparts for I/O-bound backends |
| D2.14 | Lease-gated operations take (TenantId, Lease) |
| D2.15 | acquire_and_restore_into is the only non-lease operation |
| D2.16 | Error types are operation-specific enums via From<CoordError> |
| D2.17 | now: LogicalTime passed explicitly (deterministic simulation) |
| D2.18 | RunRecord is the authoritative run record |
| D2.19 | RunStatus: 5 states (Initializing, Active, Done, Failed, Cancelled) |
| D2.20 | Two-phase creation: create_run then register_shards |
| D2.21 | Admin operations not lease-gated |
| D2.22 | RunManagement separate from CoordinationBackend |
| D2.23 | LogicalTime passed explicitly to all mutating operations |
| D2.24 | Shard listing returns ShardSummary (lightweight) |
| D2.25 | RunRecord has its own op-log (cap: 8) |
| D2.26 | Arena-pooled byte storage via ByteSlab for ShardRecord variable-size fields |
InfraError represents backend-agnostic infrastructure failures from
coordination backends. It classifies errors into two variants:
| Variant | Retryable? | Meaning |
|---|---|---|
Transient { operation, message } |
Yes | Network timeouts, gRPC failures, CAS retry budget exhausted |
Corruption { operation, message } |
No | Codec decode failures, op-log kind mismatches, missing records |
The operation field identifies the failing step as a human-readable label
(e.g., "acquire.load_shard", "renew.txn"). Every operation-specific
error type carries a BackendError(InfraError) variant for surfacing
infrastructure failures alongside protocol errors. InfraError implements
std::error::Error and is returned via Error::source() for error chain
traversal.
A shared CoordError enum with 12 variants provides the building blocks for
all operation-specific errors. Variants cover tenant isolation, fencing, lease
expiry, terminal status, OpId conflicts, cursor violations, split validation
failures, and missing checkpoint keys.
Note:
CheckpointError,CompleteError, andSplitErroralso carry aResourceExhausted(SlabFull)variant fromFrom<SlabFull>impls. This variant is not routed throughCoordError— it originates directly fromByteSlaballocation failures on the pooled storage path.
Each operation has its own error type that wraps CoordError via
From<CoordError>. The From impls explicitly enumerate all rejected
variants (no wildcard _ catch-all), so adding a new CoordError variant
triggers a compile error in every From impl, forcing a conscious routing
decision.
| Operation | Error Type | OpIdConflict? | Cursor Variants? | Split Variants? |
|---|---|---|---|---|
acquire_and_restore_into |
AcquireError |
No | No | No |
renew |
RenewError |
No | No | No |
checkpoint |
CheckpointError |
Yes | Yes | No |
complete |
CompleteError |
Yes | Yes | No |
park_shard |
ParkError |
Yes | No | No |
split_replace |
SplitReplaceError |
Yes | No | Yes |
split_residual |
SplitResidualError |
Yes | No | Yes |
Note:
SplitReplaceErrorandSplitResidualErrorare type aliases for the same underlyingSplitErrorenum. They share identical variant sets.
The IdempotentOutcome<T> wrapper distinguishes first execution from replay:
pub enum IdempotentOutcome<T> {
Executed(T), // first execution
Replayed(T), // retry -- result from op-log
}Callers generally do not need to distinguish -- the result is the same. The distinction is useful for observability (metrics, logging).
- SEC-1:
TenantMismatcherrors only exposeexpected(the caller's tenant), never the actual tenant -- prevents cross-tenant enumeration. - SEC-5:
AlreadyLeasederror redactscurrent_ownerin bothDisplayandDebugto prevent worker identity leakage. - SEC-6:
OpIdConflicterrors redactexpected_hashandactual_hashin bothDisplayandDebug.
InMemoryCoordinator is the executable specification for the shard
coordination protocol. It implements both CoordinationBackend and
RunManagement.
- Single-threaded --
&mut selfserializes all operations, eliminating concurrency concerns so invariants can be verified in-line. - Purely in-memory -- two-level
AHashMap<TenantId, AHashMap<ShardKey, ShardRecord>>. No I/O, no transactions, no retries. - Arena-pooled byte storage -- a shared
slab: ByteSlabowns all variable-size byte data (key-range start/end, metadata, cursor last-key, cursor token).ShardRecordfields holdPooledShardSpecandPooledCursorwrappers whoseByteSlothandles index into the slab. This replaces per-fieldBox<[u8]>heap allocations with slab operations on thecheckpointandacquire_and_restore_intohot paths (D2.26). - Tiger-style invariant enforcement -- every mutation path calls
ShardRecord::assert_invariants()before returning.
Shards use a two-level map: the outer level (TenantId) provides O(1) tenant
isolation -- a wrong-tenant lookup misses at the outer map without scanning
any shard records. The inner level (ShardKey) reduces hash input from 48
bytes (composite key) to 16 bytes. A total_shard_count is maintained inline
for O(1) global limit checks.
The coordinator enforces per-tenant and global shard count limits to prevent
split-flooding (CWE-400). The in-memory backend uses check_shard_limits
before every split and shard registration, accounting for temporarily-removed
records in the remove-mutate-restore pattern. The etcd backend enforces the
same limits by counting persisted shard-record keys under the tenant subtree
and the global tenant root before publishing new shard records.
The following types are re-exported from lib.rs and are part of the crate's
public API but are not covered in the operation-level sections above.
Runtime configuration for the InMemoryCoordinator, controlling capacity limits
and slab sizing.
pub struct CoordinatorRuntimeConfig {
pub default_lease_duration: u64,
pub max_shards_per_tenant: usize,
pub max_total_shards: usize,
pub claim_cooldown_interval: u64,
pub slab_capacity: usize,
}Default constants: DEFAULT_MAX_AUTO_SLAB_CAPACITY, DEFAULT_MAX_SHARDS_PER_TENANT,
DEFAULT_MAX_TOTAL_SHARDS.
Tracks shard-level progress for a run, returned by RunManagement::get_run_progress.
Includes a watermark field for tracking convergence.
RunTerminalEvaluation represents the result of evaluating whether a run has
reached a terminal decision point based on shard progress:
StillActive while any shard remains Active, HasFailures when all shards
have settled but one or more are Parked, and AllDone only when all settled
shards are Done or Split.
evaluate_run_terminal is the pure function that performs this evaluation.
Predicate type for list_shards_into queries with named constructors:
| Constructor | Semantics |
|---|---|
all() |
No filtering -- returns all shards |
active() |
Shards in non-terminal states |
available() |
Shards available for claiming |
parked() |
Parked shards only |
The root_only field controls whether only root (non-child) shards are included.
Lightweight view of a shard returned by list_shards_into. Contains fields for
status, spec bounds, cursor position, lease deadline, acquire count, key range
boundaries, and parent/child relationships.
ShardArena is the slab allocator for shard spec byte data. ShardSpecHandle
is a reference into the arena that provides zero-copy access to shard spec
fields. Both are re-exported for use by coordination consumers that need to
construct or inspect shard specs.
Validates that a residual split plan's key ranges are within the parent shard's bounds. Re-exported for use by split planning code outside the coordination crate.
The coordination layer emits structured events for observability. Events are pure inert data — the coordination layer never acts on them. Side effects (metrics, notifications, dashboards) are the caller's responsibility. This keeps the coordination layer deterministic, testable, and compatible with simulation.
- Events are returned alongside operation results via
EventCollector, not via callbacks or subscriptions. - Events are not wired into the
CoordinationBackendtrait. Backends that want emission implement companion_with_eventsmethods accepting&mut EventCollector. EventKindis#[non_exhaustive]— new variants can be added without breaking downstream.EventKindmust not deriveSerializebecauseShardCheckpointedcontainsRedactedKeywhich only redacts viaDebug, not serialization.
| Variant | Terminal? | Category |
|---|---|---|
ShardAcquired { shard, worker, fence_epoch } |
No | Shard |
ShardCheckpointed { shard, last_key: RedactedKey } |
No | Shard |
ShardResidualCreated { parent, residual } |
No | Shard |
ShardUnparked { shard, new_fence_epoch } |
No | Shard |
ShardLeaseRenewed { shard, new_deadline } |
No | Shard |
ShardCompleted { shard } |
Yes | Shard |
ShardParked { shard, reason } |
Yes | Shard |
ShardSplit { parent, children } |
Yes | Shard |
RunCreated |
No | Run |
RunActivated { shard_count } |
No | Run |
RunCompleted |
Yes | Run |
RunFailed |
Yes | Run |
RunCancelled |
Yes | Run |
Classification methods (is_shard_event, is_run_event,
is_terminal) are exhaustive matches — the compiler enforces that new
variants are handled.
Wraps Option<Box<[u8]>> to prevent accidental logging of sensitive
key material. Debug shows byte length (Some(<15 bytes>)) not
contents.
Envelope carrying (tenant, run, at: LogicalTime, kind: EventKind).
Every event can be routed by tenant, correlated to a run, and ordered
by logical timestamp without pattern-matching the kind.
Collects events during a coordination operation. Supports optional
hard capacity ceiling (with_max_capacity) — excess events are
silently dropped. Panics on cross-tenant mixing (invariant: one tenant
per collector lifetime).
| Method | Purpose |
|---|---|
new() |
Empty, no pre-allocation |
with_capacity(cap) |
Pre-allocated |
with_max_capacity(max) |
Hard ceiling, silently drops excess |
emit(event) -> bool |
Push event; returns false at capacity |
drain() -> Vec<..> |
O(1) pointer swap via mem::take |
drain_with(f) |
Iterate and clear, preserving allocation for reuse |
len(), is_empty(), iter() |
Inspection |
| Backend Operation | Event(s) Emitted |
|---|---|
acquire_and_restore_into |
ShardAcquired |
checkpoint |
ShardCheckpointed |
complete |
ShardCompleted |
park_shard |
ShardParked |
split_replace |
ShardSplit |
split_residual |
ShardResidualCreated |
renew |
ShardLeaseRenewed |
create_run |
RunCreated |
register_shards |
RunActivated |
WorkerSession is an ergonomic wrapper that captures (backend, tenant, worker, lease) at shard acquisition time and threads them
through every subsequent call, eliminating repetitive boilerplate and
preventing tenant/lease mismatches.
- Generic over backend — monomorphized per-backend, no trait-object overhead.
- Exclusive borrow (
&'b mut B) — the Rust borrow checker enforces single session per backend reference. - No automatic lease renewal — caller must call
renew()explicitly. - Move semantics for terminal ops —
complete,park,split_replaceconsumeself, making double-use a compile error. - Borrow semantics for non-terminal ops —
split_residualtakes&mut selfso the session stays active with a narrowed range. - Snapshot staleness — snapshot reflects acquisition time, is not
updated by checkpoint, but is refreshed after
split_residual.
WorkerSession::new(backend, now, tenant, key, worker)
|
v
Active Session <--+
| |
renew / checkpoint |
| |
v |
split_residual -------+ (session stays active, snapshot narrowed)
|
+------+----------+
v v v
complete park split_replace
(Done) (Parked) (Split)
+------+----------+
session consumed -> cannot be used again
| Method | Semantics |
|---|---|
new(backend, now, tenant, key, worker) |
Acquire shard, create session |
from_acquire_result(backend, acquired) |
Build from previously acquired snapshot |
lease() |
Active lease (deadline updated by renew) |
initial_snapshot() |
Borrowed snapshot from acquisition time |
spec() |
Key range + metadata (updated after split_residual) |
cursor() |
Cursor at acquisition time (not updated by checkpoint) |
tenant(), worker(), shard_key() |
Identity accessors |
capacity() |
Advisory backoff signal from last acquire/renew |
renew(now) |
Extend lease deadline |
checkpoint(now, cursor, op_id) |
Advance cursor (idempotent via op_id) |
complete(self, now, cursor, op_id) |
Terminal. Shard -> Done |
park(self, now, reason, op_id) |
Terminal. Shard -> Parked |
split_replace(self, now, plan, op_id) |
Terminal. Shard -> Split, creates N children |
split_residual(&mut self, now, plan, op_id) |
Non-terminal. Narrows range, creates residual child |
- Gray, C. and Cheriton, D. "Leases: An Efficient Fault-Tolerant Mechanism for Distributed File Cache Consistency." SOSP 1989.
- Kleppmann, M. "How to do distributed locking." 2016. https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html
- Zhou, J. et al. "FoundationDB: A Distributed Unbundled Transactional Key Value Store." SIGMOD 2021.
- Stripe. "Designing robust and predictable APIs with idempotency." (Brandur Leach, 2017).
- Bacon, D. et al. "Spanner: Becoming a SQL System." SIGMOD 2017.
- TigerBeetle. VOPR (Viewstamped Operation Replayer) deterministic simulation.
- Lamport, L. Specifying Systems: The TLA+ Language and Tools for Hardware and Software Engineers. Addison-Wesley, 2002.
- Newcombe, C. et al. "How Amazon Web Services Uses Formal Methods." Communications of the ACM 58(4), April 2015.