Skip to content

Latest commit

 

History

History
1049 lines (831 loc) · 54 KB

File metadata and controls

1049 lines (831 loc) · 54 KB

Boundary 2 -- Shard Coordination Protocol

1. Overview

Boundary 2 (Shard Coordination Protocol) manages distributed shard lifecycle, lease-based ownership, and bounded idempotent operations for the gossip-rs secret scanner. The shared data model (shard spec, cursor, pooled wrappers, manifest validation, split-replace planning core) lives in crates/gossip-contracts/src/coordination/, and the protocol layer (traits, state machine, InMemoryCoordinator, sim harness, async trait variants) lives in crates/gossip-coordination/src/. Both depend on Boundary 1 (Identity & Hashing Spine) for TenantId, ShardId, RunId, OpId, FenceEpoch, LogicalTime, and CanonicalBytes.

The durable backend lives in crates/gossip-coordination-etcd/. It provides two entrypoints: EtcdCoordinator (sync wrapper owning a single-threaded Tokio runtime) and AsyncEtcdCoordinator (async core for callers that already have a runtime). Both share the same coordination logic, static helpers, and validation code. The backend owns a real etcd client connection, a deterministic keyspace layout with typed exact-key wrappers (keyspace.rs), and an explicit binary codec (codec.rs) for persisting coordination records. It persists create_run, register_shards, get_run* queries, claim_next_available, and the fenced acquire_and_restore_into / renew / checkpoint / split_replace / split_residual hot path directly in etcd. Shard owner bindings live in separate etcd lease-backed keys so storage-layer liveness and logical lease deadlines are checked together before accepting progress updates. split_replace also enforces a backend-local max_children_per_op cap so one atomic child publication stays bounded. Each split_replace transaction uses 4 + 2N writes and 5 + N compares (total 9 + 3N txn-ops, where N = children). The default cap of 8 yields 33 total ops; operators raising the cap must verify the total stays within the cluster's --max-txn-ops budget (etcd default 128). The etcd config also carries the same per-tenant and global shard ceilings used by the in-memory backend. The persisted backend enforces those limits before register_shards, split_replace, and split_residual by counting existing shard-record keys under the relevant etcd prefixes, then rejecting growth that would exceed the configured caps. The remaining mutating shard-lifecycle operations (complete, park_shard) are fully persisted with terminal-state transitions, lease cleanup, and idempotent replay, matching the same CAS-guarded pattern used by the other shard operations. All RunManagement (and AsyncRunManagement) trait operations (run terminal transitions, unpark_shard) are fully persisted. The etcd backend also exposes backend-specific helpers for active-run listing and GC of stale initializing runs (see backend.rs file table below); these are inherent methods on EtcdCoordinator, not part of the trait surfaces.

The module provides seven core capabilities:

  • Shard lifecycle state machine -- a four-state automaton (Active, Done, Split, Parked) with terminal state enforcement and compile-time discriminant stability.
  • Lease-based ownership with fencing tokens -- time-bounded exclusive access enforced by monotonic FenceEpoch values that reject zombie workers (Kleppmann 2016).
  • Bounded idempotency -- a 16-entry FIFO op-log backed by RingBuffer that caches operation fingerprints for replay detection, inspired by Stripe's idempotency key pattern.
  • Deterministic split operations -- two strategies (replace and residual) that produce child shard IDs via domain-separated BLAKE3, with full coverage validation.
  • Run-level management -- two-phase creation (create_run then register_shards), progress aggregation, and terminal transitions with their own bounded op-log.
  • Tiger-style invariant enforcement -- crash-to-prevent-corruption philosophy where violated invariants panic before persistence, ensuring crash-recovery returns to the last valid state.
  • Source-family handoff boundary -- coordination emits shard claims, key-range state, and cursors that runtime layers route through gossip-contracts::connector and gossip-scanner-runtime so CLI and distributed execution share one family-oriented orchestration surface.

Core source files

gossip-coordination crate (crates/gossip-coordination/src/, excluding *_tests.rs except the intentionally documented scenario_tests.rs and test_fixtures.rs; src/sim/ is covered in simulation-harness.md)

File Role
traits.rs CoordinationBackend and AsyncCoordinationBackend traits -- the semantic contract for all backends (sync and async variants)
record.rs ShardRecord, ShardStatus, ParkReason, ShardSnapshotView
run.rs RunRecord, RunStatus, RunConfig, RunManagement and AsyncRunManagement traits
split_execution.rs Coordination-owned split execution helpers: derived shard IDs, payload hashing, result types
in_memory.rs In-memory reference implementation (executable spec)
lease.rs Lease, LeaseHolder, OpLogEntry, OpKind, OpResult
error.rs Shared CoordError, InfraError (typed infrastructure errors), and IdempotentOutcome
run_errors.rs Run-management error types
validation.rs validate_lease, validate_cursor_update_pooled, check_op_idempotency
shard_limits.rs Shared shard-count limit validation helpers and ShardLimitViolation snapshots used by both backends
events.rs EventCollector, EventKind, StateTransitionEvent
facade.rs CoordinationFacade, ShardClaiming, ClaimError
session.rs WorkerSession ergonomic wrapper with move/borrow lifecycle
conformance.rs Feature-gated backend-agnostic coordination conformance harness for SimulationBackend impls
scenario_tests.rs Multi-step coordination workflow tests covering realistic lease/run stories
test_fixtures.rs Shared seeded coordinators, canonical plans, and helper constructors used by coordination tests
lib.rs Module root and public re-exports

gossip-contracts crate (crates/gossip-contracts/src/coordination/, excluding *_tests.rs)

File Role
mod.rs Module root, re-exports, and boundary ownership map
cursor.rs Cursor type with monotonicity and bounds semantics
shard_spec.rs ShardSpec key-range type, CursorSemantics, split validation
pooled.rs PooledShardSpec, PooledCursor — arena-pooled byte-field wrappers
restored_state.rs RestoredShardState — grouped acquire/restore payload handed to runtime layers
limits.rs Capacity constants for split fan-out (MAX_SPLIT_CHILDREN, MAX_SPAWNED_PER_SHARD)
manifest.rs InitialShardInput, validate_manifest — shard manifest validation for register_shards
split.rs Contracts-owned split planner core (SplitReplacePlan, SplitResidualPlan, plan_split_replace*, plan_split_residual*)

gossip-coordination-etcd crate (crates/gossip-coordination-etcd/src/)

File Role
lib.rs Module root and public re-exports
behavioral_conformance.rs Real-etcd behavioral conformance scenarios that mirror the shared coordination harness using protocol operations plus persisted read-back oracles
backend.rs Module root for the backend/ directory; shared free functions (key comparison, CAS delay, shard capacity counting)
backend/coordinator.rs EtcdCoordinator (sync wrapper with owned Tokio runtime) and AsyncEtcdCoordinator (async core): shared low-level etcd RPC wrappers, CAS retry logic, data-access helpers (load/scan run and shard records), and inherent methods (list_active_runs_into, gc_stale_initializing_runs_into)
backend/run_management.rs RunManagement and AsyncRunManagement impl: create/register/terminate runs, unpark shards, claim candidates
backend/shard_coordination.rs CoordinationBackend and AsyncCoordinationBackend impl: shard acquisition, lease renewal, checkpointing, terminal transitions (complete, park_shard), and split operations
backend/test_support.rs Feature-gated seeding, inspection, snapshot, and deterministic split fault-injection helpers for etcd integration tests
sim_etcd_kv.rs Feature-gated in-memory model of the etcd KV subset used by coordination: point reads, prefix scans, CAS transactions, revision metadata, and lease-backed expiry for deterministic simulation
config.rs Endpoint + namespace validation plus owner-lease TTL, optimistic retry tuning, shard count limits, and bounded split fanout
keyspace.rs Deterministic ASCII etcd path construction plus typed exact-key wrappers for run/shard records, ownership, and active indexes
codec.rs Explicit binary encoding/decoding for coordination records and shard-owner bindings persisted to etcd
codec_tests.rs Round-trip, rejection, and proptest coverage for the binary codec
error.rs etcd connection, codec, lease, and transaction error surfaces
sim_coordinator.rs Feature-gated deterministic simulation adapter that runs the sync etcd backend over the in-memory KV model
test_support.rs Testcontainers-backed lifecycle helpers plus namespace-isolated coordinator builders for integration tests
tests.rs Config validation, keyspace path invariants (proptest), and ignored etcd integration tests covering acquire/checkpoint/renew/split lifecycle, unpark, collision/limit paths, fault-injected split atomicity, lease expiry, and contention

2. Shard State Machine

                 ┌──────────────┐
                 │    Active    │
                 └──┬───┬───┬──┘
                    │   │   │
         Complete   │   │   │  Park
           ┌────────┘   │   └────────┐
           ▼            │            ▼
      ┌────────┐   SplitReplace  ┌────────┐
      │  Done  │        │        │ Parked │
      └────────┘        ▼        └────────┘
                   ┌────────┐
                   │ Split  │
                   └────────┘

Transition rules

All transitions originate from Active. Done, Split, and Parked are terminal within the coordination protocol -- once a shard reaches one of these states, no CoordinationBackend operation may change its status. The method ShardRecord::assert_transition_legal panics on any attempt to transition from a terminal state to a different state.

Parked has one escape hatch: unpark_shard (via the RunManagement trait, not CoordinationBackend) transitions Parked back to Active and bumps the fence epoch to invalidate any zombie workers from a prior lease.

split_residual is special: it shrinks the parent's key range and spawns a residual child shard, but the parent stays Active. It is the only split strategy that does not retire the parent.

ShardStatus discriminants

ShardStatus uses #[repr(u8)] with stable discriminants that are persisted to durable storage. Compile-time assertions enforce the mapping:

Variant Discriminant Terminal?
Active 0 No
Done 1 Yes
Split 2 Yes
Parked 3 Yes

ParkReason

When a shard is parked, a ParkReason (#[repr(u8)]) is stored alongside:

Variant Discriminant Meaning
PermissionDenied 0 Connector lacks access -- requires credential rotation
NotFound 1 Scan target no longer exists
Poisoned 2 Internal inconsistency -- requires manual investigation
TooManyErrors 3 Transient errors accumulated -- suitable for auto-retry
Other 4 Catch-all; backend should log additional context separately

The invariant park_reason.is_some() iff status == Parked is asserted on every state transition via assert_invariants.


3. Op-log Pattern

The coordination protocol uses bounded idempotency -- a short-term operation cache that allows workers to safely retry RPCs without causing duplicate side effects.

Structure

Each shard carries a 16-entry FIFO op-log implemented as RingBuffer<OpLogEntry, 16>. Each entry stores five fields:

(OpId, OpKind, OpResult, payload_hash: u64, executed_at: LogicalTime)
  • OpId: CSPRNG-generated idempotency key (unique per operation)
  • OpKind: discriminant identifying the operation type (Checkpoint, Complete, Park, SplitReplace, SplitResidual, Unpark)
  • OpResult: stored outcome (Completed, Error, Superseded)
  • payload_hash: BLAKE3 fingerprint of the operation's parameters
  • executed_at: logical timestamp of first execution

Replay detection tiers

When an operation arrives with (op_id, payload_hash):

  1. Same (op_id, payload_hash) -- the op-log entry matches. Return the cached result as IdempotentOutcome::Replayed. No mutation occurs.
  2. Same op_id, different payload_hash -- the caller reused an OpId with different parameters. Return OpIdConflict error. This is always a client bug.
  3. New op_id (not in the op-log) -- execute the operation, record the result in the op-log, return IdempotentOutcome::Executed.

Idempotency is checked before lease validation on every idempotent path, so that replays succeed even after the lease has expired or the shard has reached a terminal status.

Eviction semantics

When the op-log reaches capacity (16 entries), the oldest entry is evicted via O(1) ring buffer overwrite (push_back_overwrite). After eviction, the coordinator can no longer detect a retry for that OpId and treats it as a new operation.

Post-eviction re-execution is safe because:

  1. Staleness guarantee -- eviction implies the OpId is at least 16 operations old, well past any reasonable retry window.
  2. Convergent state transitions -- re-executing a shard operation either converges to the same terminal state or is rejected by status guards (e.g., completing an already-Done shard).
  3. FenceEpoch is the primary zombie defense -- the op-log is a secondary defense for in-lease retries only. A stale worker from a prior lease epoch is fenced out by epoch comparison before the op-log is ever consulted.

Payload hashing

Payload hashes use domain-separated BLAKE3 via the OP_PAYLOAD_V1 domain constant, with per-operation tags prepended as a second domain-separation layer:

Operation Tag bytes Hashed fields
checkpoint b"checkpoint" Cursor (via CanonicalBytes)
complete b"complete" Cursor (via CanonicalBytes)
park b"park" ParkReason (via CanonicalBytes)
split_replace b"split_replace" SplitReplacePlan (length-prefixed children)
split_residual b"split_residual" SplitResidualPlan (parent + residual specs)

The per-operation tag ensures that a checkpoint and a complete with the same cursor produce different hashes.

Why RingBuffer

Data Structure Rejection Reason
Vec O(n) eviction via remove(0) -- slides all elements
VecDeque Heap-allocated -- violates zero-alloc hot-path requirement
ArrayVec No ring semantics -- eviction requires manual shift
Option<T> array 25% memory overhead for empty slots; manual index bookkeeping

RingBuffer provides O(1) push/evict, stack-allocated [MaybeUninit<T>; N] storage, and power-of-2 bitwise index calculation.


4. Run Lifecycle

 Initializing ──register_shards──→ Active
      │                              │
      │ cancel_run          ┌────────┼────────┐
      ▼                 complete fail_run  cancel
   Cancelled                │        │        │
                            ▼        ▼        ▼
                          Done    Failed  Cancelled

A "run" is a single scan invocation -- it groups a set of shards that collectively cover the target data source. The coordinator tracks run status, validates shard manifests, and provides progress aggregation.

RunStatus discriminants

RunStatus uses #[repr(u8)] with stable discriminants:

Variant Discriminant Terminal?
Initializing 0 No
Active 1 No
Done 2 Yes
Failed 3 Yes
Cancelled 4 Yes

Two-phase creation

Run creation is a two-phase process (D2.20):

  1. create_run(tenant, run_id, config) -- creates a RunRecord in Initializing status with no shards. The RunConfig carries cursor_semantics, lease_duration, and max_shard_retries.
  2. register_shards(tenant, run_id, shards, op_id) -- validates the shard manifest (uniqueness, non-empty, bounded key ranges), creates ShardRecord entries, and transitions the run to Active. This step is idempotent via OpId.

RunRecord

RunRecord is the coordinator's authoritative record for a run. It contains:

  • Identity: tenant, run (RunId)
  • Configuration: config (RunConfig)
  • Lifecycle: status (RunStatus), created_at, completed_at
  • Shard manifest: root_shards (Vec of ShardId)
  • Idempotency: op_log (RingBuffer of RunOpLogEntry, cap: 8)

RunManagement trait

RunManagement is a separate trait from CoordinationBackend (D2.22). It defines run-level and admin operations. AsyncRunManagement provides an identical async counterpart for I/O-bound backends:

Method Description Idempotent? Lease-gated?
create_run Create run in Initializing status No No
register_shards Populate shards, transition to Active Yes (OpId) No
get_run Return run record (read-only) N/A No
get_run_progress Aggregate shard status counts N/A No
list_shards_into Return filtered shard summaries N/A No
complete_run Transition Active to Done Yes (OpId) No
fail_run Transition Active to Failed Yes (OpId) No
cancel_run Transition non-terminal to Cancelled Yes (OpId) No
unpark_shard Resume Parked shard to Active Yes (OpId) No
create_run_with_shards Convenience: create + register Yes (OpId) No
collect_claim_candidates_into Hot-path claim candidate scan N/A No

Admin operations (unpark, cancel) are not lease-gated (D2.21) -- they are coordinator-level actions that do not require a worker to hold a lease.


5. Split Operations

When a shard's key range becomes too large or unevenly distributed, the coordinator splits it into smaller shards. Two strategies exist:

Split-replace

The parent is retired (status transitions to Split) and replaced by >= 2 children that collectively cover the parent's entire key range.

Execution phases:

  1. Validate -- idempotency check, lease validation, full-coverage validation via validate_split_coverage, spawn-cap guard.
  2. Build -- derive deterministic child IDs via derive_split_shard_id with DerivedShardKind::Child and sequential indices starting at parent.spawned.len(). Construct child ShardRecord entries.
  3. Apply -- transition parent to Split (terminal), insert children into the map, update run-shard index.

After split-replace, the parent is terminal. No further operations can push op-log entries, so the split-replace op-log entry is never evicted -- guaranteeing idempotent replay detection forever.

Split-residual

The parent shrinks its key range (stays Active) and a new residual shard covers the remainder. The parent keeps its lease and continues processing.

Execution phases:

  1. Validate -- idempotency (two-tier: op-log primary, spawned probe as defense-in-depth), lease validation, coverage validation via validate_residual_split, cursor-bounds check (parent's cursor must remain within shrunk range), spawn-cap guard.
  2. Build -- derive residual ID via derive_split_shard_id with DerivedShardKind::Residual. Residual starts with Cursor::initial().
  3. Apply -- update parent's spec and spawned list, insert residual into map, update run-shard index. Parent keeps its lease.

Because the parent stays Active, subsequent operations can evict the split-residual op-log entry. The find_replayed_residual function provides a secondary replay detection path by scanning parent.spawned for a residual derived from the same OpId.

Derived shard IDs

Child and residual shard IDs are derived deterministically via derive_split_shard_id, which uses domain-separated BLAKE3 (SPLIT_ID_V1) with five inputs: (run, parent_shard, op_id, kind, index). The output has bit 63 set to distinguish derived shards from externally-assigned root shards.

Birthday collision bound: ~2^31.5 values before 50% collision probability (63 effective bits). Acceptable for coordination use cases where the total number of derived shards per run is bounded.

Constants

Constant Value Purpose
MAX_SPLIT_CHILDREN 256 Maximum children in a single split-replace
MAX_SPAWNED_PER_SHARD 1024 Maximum cumulative children + residuals per parent

Compile-time assertion: MAX_SPLIT_CHILDREN <= MAX_SPAWNED_PER_SHARD.

Memory-safety pattern

Both split operations use the remove-mutate-restore pattern: the parent record is temporarily removed from the HashMap, mutated inside a closure, then restored on both success and failure paths. This avoids holding a &mut ShardRecord (from get_mut) while also inserting new child entries into the same HashMap. If the closure panics (invariant violation), the parent is intentionally not restored -- an invariant panic indicates irrecoverable corruption.


6. Lease and Fencing Protocol

Lease structure

A Lease is the capability returned by acquire_and_restore_into and required by all lease-gated mutations. It contains:

  • tenant: TenantId -- tenant isolation scope
  • run: RunId, shard: ShardId -- shard identity
  • owner: WorkerId -- the worker holding the lease
  • fence: FenceEpoch -- monotonically increasing epoch (fencing token)
  • deadline: LogicalTime -- expiry time

Fields are private -- the coordinator constructs leases and workers read them via accessors.

Fencing token protocol

Every acquire_and_restore_into increments the shard's fence_epoch. Subsequent mutations must present a lease whose fence matches the record's current epoch. If the epoch does not match, the operation is rejected with StaleFence -- the worker is a zombie from a prior ownership period.

This is the "fencing token protocol" from Kleppmann (2016): all writes carry the fence epoch, and the backend rejects stale epochs.

Lease validation order

validate_lease checks preconditions in priority order to prevent information leakage:

  1. Tenant isolation (SEC-1) -- wrong-tenant requests are rejected before any internal state is revealed
  2. Terminal status -- fast rejection of dead shards
  3. Fence epoch -- zombie fencing
  4. Lease expiry -- time-based rejection (now >= deadline)
  5. Owner divergence -- catches identity mismatches when epochs agree

LeaseHolder

LeaseHolder bundles owner: WorkerId and deadline: LogicalTime into a single value so that ShardRecord can store Option<LeaseHolder> instead of two separate Option fields -- making the "both-present-or-both-absent" invariant structurally impossible to violate.


7. Tiger-style Invariant Enforcement

The coordination protocol follows the crash-to-prevent-corruption philosophy: a violated invariant panics immediately, the operation is NOT persisted, and on crash-recovery the shard returns to its pre-operation state.

Every mutation path calls ShardRecord::assert_invariants() before returning. RunRecord::assert_invariants() performs analogous checks at the run level.

The 10 ShardRecord invariants

# Invariant Check
1 park_reason.is_some() iff status == Parked Status-reason consistency
2 (structural) Option<LeaseHolder> makes paired-ness implicit
3 Terminal shards must not hold a lease status.is_terminal() implies lease.is_none()
4 fence_epoch >= FenceEpoch::INITIAL Fence epoch minimum (>= 1)
5 op_log.len() <= OP_LOG_CAP Op-log bounded (defense-in-depth; RingBuffer enforces structurally)
6 status == Split implies !spawned.is_empty() Split shards must have children
7 parent.is_some() iff shard.is_derived() Biconditional: bit 63 set iff has parent
8 All entries in spawned satisfy is_derived() == true Spawned children must be derived
9 Op-log entries have unique OpId values No duplicate idempotency keys
10 spawned.len() <= MAX_SPAWNED_PER_SHARD Spawned count bounded at 1024

INV-9 is O(n^2) where n <= 16 (at most 120 comparisons) -- dominated by the per-transition persistence cost.

The core safety properties (mutual exclusion, zombie rejection, fence monotonicity, terminal irreversibility, split atomicity, and cursor monotonicity) are also verified exhaustively by the TLA+ model checker across all reachable states. See specs/coordination/README.md for the specification, property mapping, and instructions for running TLC.

Operational guidance

Invariant panics should be treated as critical bugs. Monitor for coordinator process crashes and alert immediately. The shard's durable state is safe (the failing operation was not persisted), but the root cause must be investigated.


8. CoordinationBackend Contract

The core trait with 7 operations that every backend (in-memory, FoundationDB, PostgreSQL, deterministic simulator) must implement. An async counterpart, AsyncCoordinationBackend, provides identical semantics with async fn methods for I/O-bound backends (etcd, FoundationDB, PostgreSQL) that benefit from non-blocking execution:

Operation Signature (simplified) Terminal? Idempotent? Lease-gated?
acquire_and_restore_into (now, tenant, key, worker, &mut AcquireScratch) -> Result<AcquireResultView, AcquireError> No No No
renew (now, tenant, lease) -> Result<RenewResult, RenewError> No No Yes
checkpoint (now, tenant, lease, cursor, op_id) -> () No Yes (OpId) Yes
complete (now, tenant, lease, cursor, op_id) -> () Yes (Done) Yes (OpId) Yes
park_shard (now, tenant, lease, reason, op_id) -> () Yes (Parked) Yes (OpId) Yes
split_replace (now, tenant, lease, plan, op_id) -> child_ids Yes (Split) Yes (OpId) Yes
split_residual (now, tenant, lease, plan, op_id) -> residual_id No Yes (OpId) Yes

Operation semantics

acquire_and_restore_into -- the entry point for a worker to start or resume scanning. Verifies the shard is Active and unleased (or lease expired), increments fence_epoch, grants a new lease, and populates a caller-owned AcquireScratch buffer with the shard's last checkpointed cursor. Returns an AcquireResultView containing lease, snapshot, and capacity: CapacityHint. The scratch parameter is a key zero-alloc design choice (see allocation policy). NOT idempotent: each successful call increments the fence epoch.

renew -- extends the lease deadline without modifying shard progress. Returns RenewResult containing new_deadline and capacity: CapacityHint. The fence epoch does NOT change. Not idempotent via OpId; duplicate calls simply extend the deadline further (harmless).

checkpoint -- persists a new cursor position (idempotent). Validates cursor monotonicity (new.last_key >= old.last_key lexicographic) and cursor bounds (last_key within [spec.start, spec.end)).

complete -- marks the shard as successfully done (terminal). Records a final cursor, releases the lease, and transitions to Done.

park_shard -- halts the shard due to an error condition (terminal). Records a ParkReason, releases the lease, and transitions to Parked.

split_replace -- replaces the parent with N child shards (terminal for parent). Validates split coverage (children must exactly partition the parent's key range). Children inherit cursor_semantics from the parent.

split_residual -- shrinks the parent and spawns a residual (non-terminal for parent). Validates that the new parent range + residual range partition the old range, and that the parent's cursor remains within the shrunk range.

Safety invariants (all backends)

Every backend implementation must maintain these invariants:

  • Tenant isolation -- a request scoped to tenant A must never read or write shard records belonging to tenant B.
  • Fence epoch monotonicity -- fence_epoch is monotonically non-decreasing per shard; it increments on every ownership transfer.
  • Idempotency -- for any operation with an OpId: same (op_id, payload_hash) returns cached result; same op_id with different hash returns OpIdConflict; new op_id executes fresh.
  • Cursor monotonicity -- across checkpoints within the same lease epoch, cursor.last_key must be lexicographically non-decreasing.
  • Cursor bounds -- cursor.last_key must fall within the shard's [spec.start, spec.end).
  • Split coverage -- split children must exactly partition the parent's key range (no gaps, no overlaps).
  • Terminal irreversibility -- once a shard reaches Done, Split, or Parked, no protocol operation changes its status.

9. Cursor Semantics

The coordination layer uses CursorUpdate<'a> — a borrowed cursor view:

┌──────────────────────────────────────────────────────┐
│ last_key: Option<&'a [u8]>                           │
│   → coordinator-visible, lex-comparable              │
│   → represents the last item key fully processed     │
├──────────────────────────────────────────────────────┤
│ token: Option<&'a [u8]>                              │
│   → connector-opaque resume state                    │
│   → pagination cursor, continuation token, etc.      │
└──────────────────────────────────────────────────────┘

CursorUpdate<'a> is a borrowed view used across CoordinationBackend trait methods. Internally, the in-memory coordinator stores cursor fields as PooledCursor — a pair of Option<ByteSlot> handles into a shared ByteSlab — eliminating per-checkpoint heap allocations on the hot path. The PooledCursor::from_update and PooledCursor::last_key(slab) methods convert between the pooled representation and borrowed views at API boundaries. The connector boundary uses the owned connector::Cursor type, which converts to/from CursorUpdate.

Monotonicity rules

old.last_key new.last_key Verdict
None None OK -- no-op checkpoint
None Some(k) OK -- first progress
Some(a) Some(b) where b >= a OK -- forward progress
Some(a) Some(a) OK -- idempotent retry
Some(_) None REJECT -- reset to none
Some(a) Some(b) where b < a REJECT -- regression

CursorSemantics

A per-run configuration (#[repr(u8)]) that controls when cursor advancement counts as committed progress:

  • Completed (0) -- strongest guarantee. The cursor only advances after all work up to that point is fully processed and results are durable.
  • Dispatched (1) -- weaker but higher throughput. The cursor advances after work is durably dispatched but not necessarily fully processed.

The coordinator enforces monotonicity and bounds identically under both semantics.

ShardSpec

ShardSpec defines a shard's key range as a half-open interval [start, end) in lexicographic byte order, plus opaque connector metadata. Empty start ([]) means "start of keyspace"; empty end ([]) means "end of keyspace" (unbounded).

Reference: Bigtable (Chang et al., OSDI 2006), Spanner (Corbett et al., OSDI 2012), CockroachDB, FoundationDB (Zhou et al., SIGMOD 2021) -- all use half-open [start, end) byte-key ranges.


10. Design Decisions

ID Decision
D2.1 Two-layer (last_key, token) cursor structure
D2.2 ShardSpec has half-open key range [start, end) with lex-ordered byte boundaries
D2.3 Cursor monotonicity is a hard safety invariant
D2.4 Cursor bounds checking is a hard safety invariant
D2.5 A checkpoint requires a last_key
D2.6 ShardStatus: exactly 4 states (Active, Done, Split, Parked)
D2.7 park_reason.is_some() iff status == Parked
D2.8 Payload hashes use domain-separated BLAKE3 with CanonicalBytes
D2.9 AcquireScratch + AcquireResultView enable zero-alloc acquire path
D2.10 Derived shard IDs have bit 63 set
D2.11 ShardRecord is self-contained (no back-references to RunConfig)
D2.12 ShardSnapshotView includes status, spec, cursor, cursor_semantics, parent, spawned; excludes lease, fence, op_log, tenant, park_reason
D2.13 Sync trait is the primary contract (Result, not futures); AsyncCoordinationBackend and AsyncRunManagement provide async counterparts for I/O-bound backends
D2.14 Lease-gated operations take (TenantId, Lease)
D2.15 acquire_and_restore_into is the only non-lease operation
D2.16 Error types are operation-specific enums via From<CoordError>
D2.17 now: LogicalTime passed explicitly (deterministic simulation)
D2.18 RunRecord is the authoritative run record
D2.19 RunStatus: 5 states (Initializing, Active, Done, Failed, Cancelled)
D2.20 Two-phase creation: create_run then register_shards
D2.21 Admin operations not lease-gated
D2.22 RunManagement separate from CoordinationBackend
D2.23 LogicalTime passed explicitly to all mutating operations
D2.24 Shard listing returns ShardSummary (lightweight)
D2.25 RunRecord has its own op-log (cap: 8)
D2.26 Arena-pooled byte storage via ByteSlab for ShardRecord variable-size fields

11. Error Architecture

InfraError

InfraError represents backend-agnostic infrastructure failures from coordination backends. It classifies errors into two variants:

Variant Retryable? Meaning
Transient { operation, message } Yes Network timeouts, gRPC failures, CAS retry budget exhausted
Corruption { operation, message } No Codec decode failures, op-log kind mismatches, missing records

The operation field identifies the failing step as a human-readable label (e.g., "acquire.load_shard", "renew.txn"). Every operation-specific error type carries a BackendError(InfraError) variant for surfacing infrastructure failures alongside protocol errors. InfraError implements std::error::Error and is returned via Error::source() for error chain traversal.

CoordError

A shared CoordError enum with 12 variants provides the building blocks for all operation-specific errors. Variants cover tenant isolation, fencing, lease expiry, terminal status, OpId conflicts, cursor violations, split validation failures, and missing checkpoint keys.

Note: CheckpointError, CompleteError, and SplitError also carry a ResourceExhausted(SlabFull) variant from From<SlabFull> impls. This variant is not routed through CoordError — it originates directly from ByteSlab allocation failures on the pooled storage path.

Operation-specific narrowing

Each operation has its own error type that wraps CoordError via From<CoordError>. The From impls explicitly enumerate all rejected variants (no wildcard _ catch-all), so adding a new CoordError variant triggers a compile error in every From impl, forcing a conscious routing decision.

Operation Error Type OpIdConflict? Cursor Variants? Split Variants?
acquire_and_restore_into AcquireError No No No
renew RenewError No No No
checkpoint CheckpointError Yes Yes No
complete CompleteError Yes Yes No
park_shard ParkError Yes No No
split_replace SplitReplaceError Yes No Yes
split_residual SplitResidualError Yes No Yes

Note: SplitReplaceError and SplitResidualError are type aliases for the same underlying SplitError enum. They share identical variant sets.

IdempotentOutcome

The IdempotentOutcome<T> wrapper distinguishes first execution from replay:

pub enum IdempotentOutcome<T> {
    Executed(T),   // first execution
    Replayed(T),   // retry -- result from op-log
}

Callers generally do not need to distinguish -- the result is the same. The distinction is useful for observability (metrics, logging).

Security redaction

  • SEC-1: TenantMismatch errors only expose expected (the caller's tenant), never the actual tenant -- prevents cross-tenant enumeration.
  • SEC-5: AlreadyLeased error redacts current_owner in both Display and Debug to prevent worker identity leakage.
  • SEC-6: OpIdConflict errors redact expected_hash and actual_hash in both Display and Debug.

12. In-Memory Reference Implementation

InMemoryCoordinator is the executable specification for the shard coordination protocol. It implements both CoordinationBackend and RunManagement.

Key characteristics

  • Single-threaded -- &mut self serializes all operations, eliminating concurrency concerns so invariants can be verified in-line.
  • Purely in-memory -- two-level AHashMap<TenantId, AHashMap<ShardKey, ShardRecord>>. No I/O, no transactions, no retries.
  • Arena-pooled byte storage -- a shared slab: ByteSlab owns all variable-size byte data (key-range start/end, metadata, cursor last-key, cursor token). ShardRecord fields hold PooledShardSpec and PooledCursor wrappers whose ByteSlot handles index into the slab. This replaces per-field Box<[u8]> heap allocations with slab operations on the checkpoint and acquire_and_restore_into hot paths (D2.26).
  • Tiger-style invariant enforcement -- every mutation path calls ShardRecord::assert_invariants() before returning.

Keying strategy

Shards use a two-level map: the outer level (TenantId) provides O(1) tenant isolation -- a wrong-tenant lookup misses at the outer map without scanning any shard records. The inner level (ShardKey) reduces hash input from 48 bytes (composite key) to 16 bytes. A total_shard_count is maintained inline for O(1) global limit checks.

Shard count limits

The coordinator enforces per-tenant and global shard count limits to prevent split-flooding (CWE-400). The in-memory backend uses check_shard_limits before every split and shard registration, accounting for temporarily-removed records in the remove-mutate-restore pattern. The etcd backend enforces the same limits by counting persisted shard-record keys under the tenant subtree and the global tenant root before publishing new shard records.


13. Additional Exported Types

The following types are re-exported from lib.rs and are part of the crate's public API but are not covered in the operation-level sections above.

CoordinatorRuntimeConfig (in_memory.rs)

Runtime configuration for the InMemoryCoordinator, controlling capacity limits and slab sizing.

pub struct CoordinatorRuntimeConfig {
    pub default_lease_duration: u64,
    pub max_shards_per_tenant: usize,
    pub max_total_shards: usize,
    pub claim_cooldown_interval: u64,
    pub slab_capacity: usize,
}

Default constants: DEFAULT_MAX_AUTO_SLAB_CAPACITY, DEFAULT_MAX_SHARDS_PER_TENANT, DEFAULT_MAX_TOTAL_SHARDS.

RunProgress (run.rs)

Tracks shard-level progress for a run, returned by RunManagement::get_run_progress. Includes a watermark field for tracking convergence.

RunTerminalEvaluation and evaluate_run_terminal (run.rs)

RunTerminalEvaluation represents the result of evaluating whether a run has reached a terminal decision point based on shard progress: StillActive while any shard remains Active, HasFailures when all shards have settled but one or more are Parked, and AllDone only when all settled shards are Done or Split. evaluate_run_terminal is the pure function that performs this evaluation.

ShardFilter (run.rs)

Predicate type for list_shards_into queries with named constructors:

Constructor Semantics
all() No filtering -- returns all shards
active() Shards in non-terminal states
available() Shards available for claiming
parked() Parked shards only

The root_only field controls whether only root (non-child) shards are included.

ShardSummary (run.rs)

Lightweight view of a shard returned by list_shards_into. Contains fields for status, spec bounds, cursor position, lease deadline, acquire count, key range boundaries, and parent/child relationships.

ShardArena and ShardSpecHandle (re-exported from gossip-contracts)

ShardArena is the slab allocator for shard spec byte data. ShardSpecHandle is a reference into the arena that provides zero-copy access to shard spec fields. Both are re-exported for use by coordination consumers that need to construct or inspect shard specs.

validate_residual_split_bounds (re-exported from gossip-contracts)

Validates that a residual split plan's key ranges are within the parent shard's bounds. Re-exported for use by split planning code outside the coordination crate.


14. Event Model (events.rs)

The coordination layer emits structured events for observability. Events are pure inert data — the coordination layer never acts on them. Side effects (metrics, notifications, dashboards) are the caller's responsibility. This keeps the coordination layer deterministic, testable, and compatible with simulation.

Design principles

  • Events are returned alongside operation results via EventCollector, not via callbacks or subscriptions.
  • Events are not wired into the CoordinationBackend trait. Backends that want emission implement companion _with_events methods accepting &mut EventCollector.
  • EventKind is #[non_exhaustive] — new variants can be added without breaking downstream.
  • EventKind must not derive Serialize because ShardCheckpointed contains RedactedKey which only redacts via Debug, not serialization.

EventKind

Variant Terminal? Category
ShardAcquired { shard, worker, fence_epoch } No Shard
ShardCheckpointed { shard, last_key: RedactedKey } No Shard
ShardResidualCreated { parent, residual } No Shard
ShardUnparked { shard, new_fence_epoch } No Shard
ShardLeaseRenewed { shard, new_deadline } No Shard
ShardCompleted { shard } Yes Shard
ShardParked { shard, reason } Yes Shard
ShardSplit { parent, children } Yes Shard
RunCreated No Run
RunActivated { shard_count } No Run
RunCompleted Yes Run
RunFailed Yes Run
RunCancelled Yes Run

Classification methods (is_shard_event, is_run_event, is_terminal) are exhaustive matches — the compiler enforces that new variants are handled.

RedactedKey

Wraps Option<Box<[u8]>> to prevent accidental logging of sensitive key material. Debug shows byte length (Some(<15 bytes>)) not contents.

StateTransitionEvent

Envelope carrying (tenant, run, at: LogicalTime, kind: EventKind). Every event can be routed by tenant, correlated to a run, and ordered by logical timestamp without pattern-matching the kind.

EventCollector

Collects events during a coordination operation. Supports optional hard capacity ceiling (with_max_capacity) — excess events are silently dropped. Panics on cross-tenant mixing (invariant: one tenant per collector lifetime).

Method Purpose
new() Empty, no pre-allocation
with_capacity(cap) Pre-allocated
with_max_capacity(max) Hard ceiling, silently drops excess
emit(event) -> bool Push event; returns false at capacity
drain() -> Vec<..> O(1) pointer swap via mem::take
drain_with(f) Iterate and clear, preserving allocation for reuse
len(), is_empty(), iter() Inspection

Operation-to-event mapping

Backend Operation Event(s) Emitted
acquire_and_restore_into ShardAcquired
checkpoint ShardCheckpointed
complete ShardCompleted
park_shard ShardParked
split_replace ShardSplit
split_residual ShardResidualCreated
renew ShardLeaseRenewed
create_run RunCreated
register_shards RunActivated

15. WorkerSession (session.rs)

WorkerSession is an ergonomic wrapper that captures (backend, tenant, worker, lease) at shard acquisition time and threads them through every subsequent call, eliminating repetitive boilerplate and preventing tenant/lease mismatches.

Design properties

  • Generic over backend — monomorphized per-backend, no trait-object overhead.
  • Exclusive borrow (&'b mut B) — the Rust borrow checker enforces single session per backend reference.
  • No automatic lease renewal — caller must call renew() explicitly.
  • Move semantics for terminal opscomplete, park, split_replace consume self, making double-use a compile error.
  • Borrow semantics for non-terminal opssplit_residual takes &mut self so the session stays active with a narrowed range.
  • Snapshot staleness — snapshot reflects acquisition time, is not updated by checkpoint, but is refreshed after split_residual.

Lifecycle

WorkerSession::new(backend, now, tenant, key, worker)
          |
          v
       Active Session <--+
          |               |
    renew / checkpoint    |
          |               |
          v               |
    split_residual -------+  (session stays active, snapshot narrowed)
          |
   +------+----------+
   v      v          v
complete  park   split_replace
(Done)  (Parked)   (Split)
   +------+----------+
     session consumed -> cannot be used again

Public API

Method Semantics
new(backend, now, tenant, key, worker) Acquire shard, create session
from_acquire_result(backend, acquired) Build from previously acquired snapshot
lease() Active lease (deadline updated by renew)
initial_snapshot() Borrowed snapshot from acquisition time
spec() Key range + metadata (updated after split_residual)
cursor() Cursor at acquisition time (not updated by checkpoint)
tenant(), worker(), shard_key() Identity accessors
capacity() Advisory backoff signal from last acquire/renew
renew(now) Extend lease deadline
checkpoint(now, cursor, op_id) Advance cursor (idempotent via op_id)
complete(self, now, cursor, op_id) Terminal. Shard -> Done
park(self, now, reason, op_id) Terminal. Shard -> Parked
split_replace(self, now, plan, op_id) Terminal. Shard -> Split, creates N children
split_residual(&mut self, now, plan, op_id) Non-terminal. Narrows range, creates residual child

16. References

  • Gray, C. and Cheriton, D. "Leases: An Efficient Fault-Tolerant Mechanism for Distributed File Cache Consistency." SOSP 1989.
  • Kleppmann, M. "How to do distributed locking." 2016. https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html
  • Zhou, J. et al. "FoundationDB: A Distributed Unbundled Transactional Key Value Store." SIGMOD 2021.
  • Stripe. "Designing robust and predictable APIs with idempotency." (Brandur Leach, 2017).
  • Bacon, D. et al. "Spanner: Becoming a SQL System." SIGMOD 2017.
  • TigerBeetle. VOPR (Viewstamped Operation Replayer) deterministic simulation.
  • Lamport, L. Specifying Systems: The TLA+ Language and Tools for Hardware and Software Engineers. Addison-Wesley, 2002.
  • Newcombe, C. et al. "How Amazon Web Services Uses Formal Methods." Communications of the ACM 58(4), April 2015.