The executor module implements a work-stealing thread pool that drives all scanning parallelism in the project. External producers (discovery threads, I/O completion handlers) enqueue tasks into a global injector queue, and N worker threads consume them using a local-first, steal-on-idle strategy.
The design splits into two files:
executor.rs— Production types:Executor,ExecutorHandle,WorkerCtx,ExecutorConfig, threaded worker loop, and idle policy.executor_core.rs— Shared policy logic: combined atomic state encoding,pop_taskalgorithm,worker_stepfunction, trait abstractions for idle/tracing hooks, and loom concurrency tests. This split ensures the production executor and deterministic simulation harness share identical scheduling semantics.
graph TB
subgraph "External Producers"
Discovery["Discovery Thread"]
IO["I/O Completion"]
API["API Callers"]
end
subgraph "Executor"
Injector["Injector Queue<br/>(Crossbeam MPMC)"]
subgraph "Worker 0"
D0["Deque (LIFO local)"]
S0["WorkerCtx<br/>scratch + metrics + rng"]
end
subgraph "Worker 1"
D1["Deque (LIFO local)"]
S1["WorkerCtx<br/>scratch + metrics + rng"]
end
subgraph "Worker N"
DN["Deque (LIFO local)"]
SN["WorkerCtx<br/>scratch + metrics + rng"]
end
SharedState["Shared State<br/>state · done · unparkers · panic"]
end
Discovery -->|spawn_external| Injector
IO -->|ExecutorHandle::spawn| Injector
API -->|ExecutorHandle::spawn_batch| Injector
Injector -->|steal_batch_and_pop| D0
Injector -->|steal_batch_and_pop| D1
Injector -->|steal_batch_and_pop| DN
D0 <-->|"FIFO steal"| D1
D1 <-->|"FIFO steal"| DN
D0 <-->|"FIFO steal"| DN
| Decision | Rationale |
|---|---|
| Per-worker Chase-Lev deque | LIFO local pop maximizes cache locality; FIFO steal preserves fairness |
| Global crossbeam injector | MPMC queue for external producers; batch steal amortizes lock cost |
| Combined atomic state word | Eliminates TOCTOU race between spawn and join (see Shutdown Protocol) |
| Randomized victim selection | Avoids correlated contention when all workers go idle simultaneously |
| Parker/Unparker pattern | No lost wakeups; each unpark is either consumed or becomes a no-op |
| Tiered idle (spin → yield → park) | Trades CPU burn for latency on bursty workloads |
Configuration for the executor. All defaults are conservative.
pub struct ExecutorConfig {
pub workers: usize, // Number of worker threads (default: 1)
pub seed: u64, // RNG seed for deterministic victim selection
pub steal_tries: u32, // Steal attempts per idle cycle (default: 4)
pub spin_iters: u32, // Spin iterations before park (default: 200)
pub park_timeout: Duration,// Park timeout after spin (default: 200µs)
pub pin_threads: bool, // Pin workers to cores (Linux only)
}Source: executor.rs
| Knob | Workload Sensitivity |
|---|---|
workers |
CPU count, task CPU-boundedness |
steal_tries |
Task fanout pattern, worker count |
spin_iters |
Task latency distribution |
park_timeout |
External spawn frequency |
The top-level executor that owns worker threads and shared state.
pub struct Executor<T> {
shared: Arc<Shared<T>>,
threads: Vec<JoinHandle<WorkerMetricsLocal>>,
}Source: executor.rs
Type parameter: T is the task type. Should be small (≤32 bytes) and Copy if possible. Avoid Box<dyn FnOnce()> — use an enum instead.
Lifecycle:
Executor::new(config, scratch_init, runner)— creates and starts worker threads immediatelyspawn_external(task)orspawn_external_batch(tasks)— enqueue workjoin()— close gate, drain in-flight tasks, collect metrics, propagate panics
Key methods:
| Method | Purpose |
|---|---|
new(cfg, scratch_init, runner) |
Create executor; workers start and idle immediately |
handle() |
Get a cloneable ExecutorHandle<T> for external spawning |
spawn_external(task) |
Convenience for handle().spawn(task) |
spawn_external_batch(tasks) |
Batch injection; amortizes wakeups |
shutdown() |
Signal cooperative stop; rejects future spawns |
join(self) |
Close gate, wait for completion, return MetricsSnapshot |
Thin, cloneable handle for external producers. This is the seam between I/O and CPU engines.
pub struct ExecutorHandle<T> {
shared: Arc<Shared<T>>,
}Source: executor.rs
Thread safety: Clone + Send + Sync. Multiple producers can call spawn concurrently.
Key methods:
| Method | Purpose |
|---|---|
spawn(task) |
CAS loop: atomically check accepting + increment count; push to injector; unpark one worker |
spawn_batch(tasks) |
All-or-nothing batch spawn; wakeups bounded to worker count |
is_accepting() |
Check if executor is still open |
shutdown() |
Close gate + signal done |
Error handling: spawn returns Err(task) if the executor is shutting down, giving the caller back the task.
Per-worker context passed to every task execution. Contains both user-facing state and internal scheduling machinery.
pub struct WorkerCtx<T, S> {
// User-facing
pub worker_id: usize,
pub scratch: S,
pub rng: XorShift64,
pub metrics: WorkerMetricsLocal,
// Internal
local: Worker<T>, // Chase-Lev deque
parker: Parker, // Crossbeam Parker
shared: Arc<Shared<T>>, // Shared state
local_spawns_since_wake: u32, // Wake-on-hoard counter
}Source: executor.rs
Type parameters:
T: Task typeS: User-defined scratch type, initialized viascratch_initclosure
Key methods:
| Method | Cost | Description |
|---|---|---|
spawn_local(task) |
fetch_add + deque push |
Enqueue to own deque; best cache locality |
spawn_global(task) |
fetch_add + injector push + unpark |
Enqueue to global injector; higher contention |
handle() |
Arc clone | Get an ExecutorHandle for external spawning |
Wake-on-hoard: After WAKE_ON_HOARD_THRESHOLD (32) consecutive local spawns, spawn_local proactively wakes a sibling to prevent one worker from hoarding work while others sleep.
Shared state behind Arc, accessed by all workers and the executor owner.
struct Shared<T> {
injector: Injector<T>, // Global MPMC queue
stealers: Vec<Stealer<T>>, // Per-worker steal handles
state: AtomicUsize, // Combined (count << 1) | accepting
done: AtomicBool, // Monotonic stop flag
unparkers: Vec<Unparker>, // Per-worker wakeup handles
next_unpark: AtomicUsize, // Round-robin wakeup counter
panic: Mutex<Option<Box<...>>>, // First captured panic
}Source: executor.rs
Invariants:
stealers.len() == unparkers.len() == config.workersstateencodes both accepting flag and in-flight countdoneis monotonic: oncetrue, never clearedpaniccaptures only the first panic; subsequent panics are discarded
Abstraction over worker context that allows both the production WorkerCtx and the simulation harness to share the same worker_step logic.
Source: executor_core.rs
Outcome of a single worker step:
| Variant | Meaning |
|---|---|
RanTask { tag, source, victim } |
Executed one task; source is Local, Injector, or Steal |
NoWork |
No task found; idle hooks decided to continue spinning |
ShouldPark { timeout } |
No task found; idle hooks request parking |
ExitDone |
Executor is done; worker should exit |
ExitPanicked |
Worker caught a panic and recorded it |
Source: executor_core.rs
Where a task was obtained from: Local, Injector, or Steal.
Source: executor_core.rs
Trait for pluggable idle policies. The production implementation (TieredIdle) uses spin → yield → park. Simulation can use a no-op or deterministic equivalent.
Source: executor_core.rs
Optional trace hooks for the step engine. Production uses NoopTrace (zero overhead). Simulation can plug in a recorder for deterministic replay.
Source: executor_core.rs
sequenceDiagram
participant External as External Producer
participant Injector as Global Injector
participant Worker as Worker Thread
participant Local as Local Deque
Note over External: Discovery thread or I/O handler
External->>Injector: CAS: check accepting + increment count
External->>Injector: injector.push(task)
External->>Worker: unpark_one() (round-robin)
Worker->>Local: 1. pop_local() [LIFO, O(1)]
alt local empty
Worker->>Injector: 2. steal_batch_and_pop() [batch, O(batch)]
alt injector empty
Worker->>Worker: 3. steal_from_victim(random) [FIFO, O(steal_tries)]
end
end
Worker->>Worker: Execute runner(task, &mut ctx)
Worker->>Worker: fetch_sub(COUNT_UNIT) — decrement in-flight
opt Task spawns children
Worker->>Local: spawn_local(child) [fast path]
Note over Worker,Local: Wake-on-hoard every 32 local spawns
end
opt count==0 && !accepting
Worker->>Worker: initiate_done()
end
The pop_task function (executor_core.rs) tries sources in this order:
- Local deque (LIFO) — Zero contention, best cache locality. Tasks spawned locally likely operate on data still in L1/L2.
- Global injector (batch steal) —
steal_batch_and_popmoves multiple tasks to the local deque, amortizing global queue access cost. - Random victim (FIFO steal) — Last resort. Randomized victim selection distributes steal attempts uniformly, avoiding thundering-herd on a single hot worker.
The victim selection formula ensures a worker never steals from itself:
victim = rng.next_usize(n - 1)
if victim >= self_id { victim += 1 }
The executor uses a single AtomicUsize to encode both the accepting flag and the in-flight task count:
63 1 0
┌─────────────────────────────┬─────┐
│ in_flight_count │ A │
└─────────────────────────────┴─────┘
│
└── accepting bit (1=open, 0=closed)
Source: executor_core.rs, executor.rs
A naive implementation with separate atomics for "accepting" and "count" has a TOCTOU race:
// BROKEN:
if accepting.load() { // (1) check — gate still open
count.fetch_add(1); // (2) increment
// Another thread calls join() between (1) and (2)
// → Task spawned after join "closed the gate"
}
The combined state eliminates this by making the check-and-increment atomic via CAS.
| Operation | Atomic | Effect |
|---|---|---|
| Init | store(ACCEPTING_BIT) |
state = 1 (accepting, count=0) |
| External spawn | CAS loop | If accepting: count++ |
| Internal spawn | fetch_add(COUNT_UNIT) |
count++ (workers only run when open) |
| Completion | fetch_sub(COUNT_UNIT) |
count--; if result was count=1 && !accepting: trigger done |
| Join/close | fetch_and(!ACCEPTING_BIT) |
Clear accepting bit |
The state machine is verified under all interleavings using loom tests in executor_core.rs:
spawn_vs_join_gate_atomicity— CAS spawn racing against gate closecompletion_vs_join_termination— Exactly one observer triggers terminationconcurrent_spawns_no_lost_count— Two concurrent spawns produce count=2three_way_spawn_complete_join— Three-way race with correct terminal state
stateDiagram-v2
[*] --> Accepting: init (state=0x01)
Accepting --> Accepting: spawn (CAS: count++)
Accepting --> Accepting: completion (fetch_sub)
Accepting --> Draining: join() or shutdown()<br/>fetch_and(!ACCEPTING_BIT)
Draining --> Draining: completion (count--)
Draining --> Terminal: last completion<br/>(count 1→0, !accepting)
Accepting --> Terminal: join() when count==0
Terminal --> [*]: initiate_done()<br/>unpark_all → workers exit
- Close gate:
fetch_and(!ACCEPTING_BIT)— atomically clears accepting bit, returns previous state. - Check terminal: If
in_flight(prev_state) == 0, callinitiate_done()immediately. - Join threads: Pop each
JoinHandle, collectWorkerMetricsLocalfrom each worker. - Merge metrics: Aggregate into a single
MetricsSnapshot. - Propagate panic: If any worker panicked,
resume_unwindon the calling thread.
Cooperative stop for abandonment scenarios (e.g., lease loss):
close_gate(&self.shared.state)— reject future spawnsself.shared.initiate_done()— setdone=true, unpark all workers
In-flight tasks may be dropped. This is not a graceful drain.
- Worker task execution is wrapped in
panic::catch_unwind. - On panic: decrement in-flight count, record panic via
Shared::record_panic, signal done. - Only the first panic is captured;
join()re-throws it after all threads have been joined.
When no work is found, workers use graduated backoff:
graph LR
A[Work Found] -->|on_work| B["idle_rounds = 0"]
C[No Work] -->|"idle_rounds <= spin_iters"| D["spin_loop() hint"]
C -->|"idle_rounds > spin_iters"| E{"every 16th iter?"}
E -->|yes| F["yield_now()"]
E -->|no| G["park_timeout(200µs)"]
F --> G
Source: executor.rs
| Phase | Condition | Action | Cost |
|---|---|---|---|
| Spin | idle_rounds <= spin_iters (200) |
spin_loop() hint |
CPU busy-wait |
| Yield | Every 16th iteration past spin threshold | thread::yield_now() |
1 syscall |
| Park | After spin threshold | parker.park_timeout(200µs) |
Sleep until unparked or timeout |
Without this heuristic, a worker rapidly spawning local tasks accumulates thousands in its deque while siblings sleep. After WAKE_ON_HOARD_THRESHOLD (32) consecutive local spawns, the worker calls unpark_one() to wake a sibling for stealing.
Source: executor_core.rs, executor.rs
| Threshold | Wakeup Rate | Overhead | Tail Latency |
|---|---|---|---|
| 8 | High | ~12.5% of spawns trigger syscall | Low |
| 32 (default) | Medium | ~3% of spawns trigger syscall | Medium |
| 128 | Low | ~0.8% of spawns trigger syscall | Higher |
unpark_one() uses a round-robin counter (next_unpark) to distribute wakeup load across workers. For power-of-two worker counts, bitmask is used instead of modulo to avoid division on ARM.
Source: executor.rs
The primary consumer. scan_local_with_progress creates an Executor<FileTask> where:
- Task type:
FileTask(containsFileIdandPathBuf) - Scratch type:
LocalScratch(engine scratch, buffer pool handle, finding buffers, archive state) - Runner:
process_file::<E>— opens the file, reads chunks, scans with the engine, emits findings
scan_local_with_progress()
│
├─ Executor::new(ExecutorConfig, scratch_init, process_file)
│ └─ Workers start; each gets LocalScratch with:
│ engine, pool, scan_scratch, pending Vec, archive state
│
├─ Discovery loop (main thread):
│ for file in source.next_file():
│ budget.acquire(1) ← backpressure
│ batch.push(FileTask { file_id, path })
│ ex.spawn_external_batch(batch)
│
└─ ex.join() → MetricsSnapshot
Source: local_fs_owner.rs
The ExecutorConfig is constructed from LocalConfig:
ExecutorConfig {
workers: cfg.workers,
seed: cfg.seed,
pin_threads: cfg.pin_threads,
..ExecutorConfig::default() // steal_tries=4, spin_iters=200, park_timeout=200µs
}Wraps scan_local with directory walking (gitignore, symlinks, hidden files). The ParallelScanConfig is converted to LocalConfig via to_local_config(), which in turn configures the executor.
parallel_scan_dir(root, engine, config)
│
├─ IterWalker::new(root, &config) ← single-threaded discovery
│
└─ scan_local(engine, walker, config.to_local_config())
└─ Executor<FileTask> (as above)
Source: parallel_scan.rs
Provides set_current_worker_id / current_worker_id TLS for per-worker fast-path routing in the buffer pool. Workers set their ID at startup; non-worker threads see None.
Source: worker_id.rs
Provides deterministic yield policies (EveryN, NeverYield, AlwaysYield, AdaptiveYield, GitYieldPolicy) for long-running tasks to voluntarily yield back to the executor. Tasks use spawn_local to re-enqueue themselves with updated cursor state.
Source: yield_policy.rs
| Constant | Value | Location | Purpose |
|---|---|---|---|
ACCEPTING_BIT |
1 |
executor_core.rs |
LSB in combined state word; 1 when accepting external spawns |
COUNT_UNIT |
2 |
executor_core.rs |
Increment unit for in-flight count (count stored in bits 1+) |
WAKE_ON_HOARD_THRESHOLD |
32 |
executor_core.rs |
Local spawns before proactively waking a sibling |
| Parameter | Default | Source |
|---|---|---|
workers |
1 |
executor.rs |
seed |
0x853c49e6748fea9b |
executor.rs |
steal_tries |
4 |
executor.rs |
spin_iters |
200 |
executor.rs |
park_timeout |
200µs |
executor.rs |
pin_threads |
true on Linux, false elsewhere |
executor.rs |
| Constant | Value | Source | Purpose |
|---|---|---|---|
| Yield frequency | Every 16th idle iteration past spin threshold | executor.rs |
Bitmask & 0xF — heuristic to avoid monopolizing core |
| Parameter | Default | Source |
|---|---|---|
workers |
num_cpus::get() |
parallel_scan.rs |
chunk_size |
256 KiB | parallel_scan.rs |
pool_buffers |
4 × workers |
parallel_scan.rs |
max_in_flight_objects |
1024 |
parallel_scan.rs |
seed |
0x853c49e6748fea9b |
parallel_scan.rs |
| Invariant | Mechanism | Verified By |
|---|---|---|
| Work-conserving | Once spawned, a task will execute | Combined state prevents lost spawns after gate close |
| Termination detection | in_flight counter in combined state |
Loom tests in executor_core.rs |
| No lost wakeups | Parker/Unparker pattern | Crossbeam guarantee: unpark before park becomes no-op |
| Panic isolation | catch_unwind around runner |
Test panic_in_task_decrements_count (executor.rs) |
| No TOCTOU on shutdown | Combined atomic state word | Loom test spawn_vs_join_gate_atomicity |
| Exactly-once termination | prev_count == 1 && !accepting check |
Loom test completion_vs_join_termination |
| Counter bounded | in_flight never underflows |
assert!(prev_count > 0) in worker_step (executor_core.rs) |
| Aspect | Cost |
|---|---|
| External spawn (CAS loop) | 1–3 CAS iterations typical |
| Injector push | O(1) amortized (crossbeam MPMC) |
| Unpark | 1 condvar notify (pthread on POSIX) |
| Local pop | O(1), zero contention |
| Batch steal from injector | O(batch_size) |
| Victim steal | O(steal_tries) worst case per idle cycle |
| Termination check | O(1) via combined atomic state |
| Item | File |
|---|---|
ExecutorConfig |
executor.rs |
ExecutorHandle<T> |
executor.rs |
Shared<T> |
executor.rs |
WorkerCtx<T, S> |
executor.rs |
Executor<T> |
executor.rs |
worker_loop |
executor.rs |
TieredIdle |
executor.rs |
ACCEPTING_BIT / COUNT_UNIT |
executor_core.rs |
WAKE_ON_HOARD_THRESHOLD |
executor_core.rs |
PopSource |
executor_core.rs |
WorkerStepResult<Tag> |
executor_core.rs |
ExecTraceEvent<Tag> |
executor_core.rs |
TraceHooks<T> / NoopTrace |
executor_core.rs |
IdleAction / IdleHooks |
executor_core.rs |
WorkerCtxLike<T, S> |
executor_core.rs |
in_flight() / is_accepting() / close_gate() / increment_count() |
executor_core.rs |
pop_task() |
executor_core.rs |
worker_step() |
executor_core.rs |
| Loom tests | executor_core.rs |
XorShift64 (RNG) |
rng.rs |
CoreAssigner / default_pin_threads |
affinity.rs |
set_current_worker_id / current_worker_id |
worker_id.rs |
YieldPolicy / EveryN / NeverYield / AdaptiveYield |
yield_policy.rs |
ParallelScanConfig |
parallel_scan.rs |
scan_local_with_progress (executor construction) |
local_fs_owner.rs |
WorkerMetricsLocal |
metrics.rs |
MetricsSnapshot |
metrics.rs |
All paths are relative to crates/scanner-scheduler/src/scheduler/.