The task_graph module defines a typed, state-machine-based task model for the scanner scheduler. Rather than generic closures, tasks are explicit Rust enum variants that track object context, frontier permits, and buffer lifetimes. This design ensures memory safety, enables introspection for metrics, and avoids unnecessary heap allocations.
Core abstraction: Objects flow through a deterministic state machine as they are discovered, fetched, and scanned. Each state transition is triggered by specific events (discovery, I/O completion, scan completion).
This module defines the task/data model and frontier primitives. Backend-specific worker execution (for example, local filesystem archive dispatch) is implemented in scheduler runtime modules.
┌─────────────────────────────────┐
│ Frontier Permit Acquired │
│ (Object Context Created) │
└────────────┬────────────────────┘
│
▼
┌────────────────┐
│ FetchSync(0) │
│ Read Chunk 0 │
└────────┬───────┘
│
┌────────────────────┼────────────────────┐
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌─────────────────┐
│ Scan(0) │ │FetchSync(N) │ │ Archive Entry? │
│ Process Data │ │ Read Next │ │ Spawn Enumerate │
└──────┬───────┘ │ │ │ for Nested Objs │
│ └──────┬───────┘ └─────────────────┘
│ │
│ ▼
│ ┌──────────────┐
│ │ Scan(N) │
│ │ Final Chunk │
│ └──────┬───────┘
│ │
└────────┬────────┘
│
▼
┌────────────────────────────┐
│ ObjectCtx Last Ref Dropped │
│ Frontier Permit Released │
│ Object Lifecycle Complete │
└────────────────────────────┘
- Trigger: Enumeration discovers objects from a source
- Precondition:
frontier.try_acquire_ctx()succeeds - Action:
- Create
ObjectCtxwrappingObjectDescriptor+ObjectPermit - Wrap in
Arc<ObjectCtx>(ObjectRef) - Enqueue
FetchSync { obj, offset: 0 }
- Create
- Postcondition: Frontier in_flight count increments
- Trigger: Read of a buffer from storage completes
- Action:
- Clone
ObjectReffor Scan task - Enqueue
Scan { obj, buffer, base_offset, len, prefix_len } - If more data remains: clone
ObjectRefand enqueue nextFetchSync
- Clone
- Key Point:
ObjectRefis cloned (atomic increment), not deep copied
- Trigger: Scan task execution
- Action:
- Run detection engine on buffer
- Emit findings
- Return buffer to pool
- May spawn
FetchSyncorEnumeratefor nested objects (archives)
- Postcondition: Scan task drops its
ObjectRef
- Trigger: Last task holding
ObjectRefcompletes - Mechanism:
Arc::drop()detects ref count = 0, callsObjectCtx::drop() - Effect:
ObjectPermitinObjectCtxdrops, releases frontier quota - Guarantee: Exactly once per object
Before an object can transition from Enumerate to FetchSync, a frontier permit must be acquired:
// In Enumerate task
if let Some(obj_ref) = frontier.try_acquire_ctx(descriptor, file_id) {
spawn(Task::FetchSync { obj: obj_ref, offset: 0 });
} else {
// Frontier at capacity; re-enqueue self for later
spawn(Task::Enumerate { source_id, cursor });
return;
}Non-blocking requirement: Enumerate MUST use try_acquire_ctx(), never acquire(). Blocking on frontier from executor threads causes deadlock.
// After successful read
let obj_for_scan = Arc::clone(&obj);
spawn(Task::Scan {
obj: obj_for_scan,
buffer,
base_offset,
len,
prefix_len
});
// If more data remains
if offset + CHUNK_SIZE < file_size {
let obj_for_next = Arc::clone(&obj);
spawn(Task::FetchSync {
obj: obj_for_next,
offset: offset + CHUNK_SIZE
});
}Arc cloning: Each clone increments the reference count atomically. The original obj can drop without releasing the permit as long as other clones exist.
During Scan, if detection finds an archive or nested container:
// Inside Scan task
if detection_finds_archive(buffer) {
// Spawn Enumerate for archive contents
spawn(Task::Enumerate {
source_id: nested_source_id,
cursor: EnumCursor::Start
});
}The nested enumeration acquires its own frontier permits independently.
Dispatch note: in the local filesystem scheduler, archive containers are
detected by extension/magic and routed through dispatch_archive_scan, which
performs real archive scanning for supported formats and policy-driven skip or
partial behavior for unsupported/budget-limited cases.
Abort policy note: when archive policies trigger FailRun, the local
scheduler sets a shared abort flag. Discovery stops enqueuing new files and
workers skip further processing, providing a deterministic run-wide abort
without leaking permits.
When the frontier is at capacity, Enumerate does not block or drop work:
// Try to spawn objects
for object in batch {
if let Some(obj_ref) = frontier.try_acquire_ctx(descriptor, file_id) {
spawn(Task::FetchSync { obj: obj_ref, offset: 0 });
spawned_count += 1;
} else {
// Frontier full; re-enqueue Enumerate and stop
let remaining_cursor = EnumCursor::Continue(Box::new(updated_state));
spawn(Task::Enumerate { source_id, cursor: remaining_cursor });
break;
}
}This is work-conserving: no objects are dropped, only delayed.
- Guarantee: Every discovered object eventually scans (or is explicitly cancelled).
- Mechanism: Frontier permits are refcounted;
Enumeratere-enqueues on backpressure. - Violation: If an object's
Arcis dropped without releasing its permit, frontier capacity leaks.
- Guarantee: Number of in-flight objects ≤
frontier.capacity() - Mechanism:
frontier.try_acquire()returnsNoneif capacity exhausted. - Verification: Enumerate re-enqueues; never blocks waiting for space.
- Guarantee: Each frontier permit is released exactly once.
- Mechanism:
ObjectPermitis RAII; lives insideObjectCtx. WhenArc<ObjectCtx>strong count reaches 0,ObjectCtx::drop()runs, releasing the permit. - Invariant: The permit lifetime exactly matches the lifetime of all tasks touching the object.
Chain example (3-chunk object, 2 scans):
FetchSync(0) [obj_ref count = 1, permit held]
├─ clone → FetchSync(1) [count = 2]
├─ clone → Scan(0) [count = 3]
└─ drop [FetchSync(0) completes, count = 2]
FetchSync(1) [count = 2, permit held]
├─ clone → Scan(1) [count = 3]
└─ drop [FetchSync(1) completes, count = 2]
Scan(0) [count = 2, permit held]
└─ drop [Scan(0) completes, count = 1]
Scan(1) [count = 1, permit held]
└─ drop [Scan(1) completes, count = 0]
[ObjectCtx drops, permit released]
- Guarantee: Task queue size is predictable.
- Mechanism:
ObjectRefis 8 bytes (just an Arc pointer), not 24+ bytes (PathBuf).- Tasks are packed efficiently into deques.
MAX_FETCH_SPAWNS_PER_ENUM = ENUM_BATCH_SIZE = 32defines the intended per-enumerate spawn cap for handlers that apply these constants.
- Guarantee: Enumerate tasks never block on frontier.acquire().
- Mechanism: Uses
try_acquire_ctx()only; re-enqueues on failure. - Correctness: Prevents deadlock when all executor threads are running Enumerate tasks.
- Enumerate: Discover file path, size hint
- FetchSync(0): Read first chunk (0..CHUNK_SIZE)
- Scan(0): Detect patterns, emit findings
- FetchSync(1): (if file > CHUNK_SIZE) Read next chunk
- Scan(1): Continue scanning with overlap prefix for pattern boundary crossing
- ...repeat until EOF...
- All tasks drop → ObjectCtx drops → Permit released
- Enumerate: Discover archive.zip
- FetchSync(0): Read archive chunks
- Scan(0): Detection recognizes archive structure
- Spawns:
Enumerate { source_id: nested_source_id, ... }
- Spawns:
- Nested Enumerate: Extract and enumerate archive contents
- For each nested object: Repeat regular file flow above
- Archive object + nested objects: Each maintains own frontier permit
Within a single file:
- Chunk 0: offset [0, CHUNK_SIZE), base_offset = 0
- Chunk 1: offset [CHUNK_SIZE, 2*CHUNK_SIZE), base_offset = CHUNK_SIZE
- Chunk N: offset with overlap prefix for deduplication
Overlap prefix (prefix_len):
- Allows patterns to span chunk boundaries
- Scan task receives:
buffer[0..len]wherebuffer[0..prefix_len]is overlap carried from prior bytes (copied in local-fs flow; range re-read in remote flow) bytes_scannedmetric counts only non-overlap bytes
The executor runs tasks from work-stealing deques:
┌──────────────────────────────┐
│ Executor (N worker threads)│
├──────────────────────────────┤
│ Global task queue │
│ ┌────────────────────────────┤
│ │ Enumerate { source_id, ..} │
│ │ FetchSync { obj, offset } │
│ │ Scan { obj, buffer, ... } │
│ └────────────────────────────┤
└──────────────────────────────┘
│
Dequeue
│
▼
┌──────────────────────────────┐
│ Match Task variant │
├──────────────────────────────┤
│ Enumerate → enumerate() │
│ FetchSync → fetch_sync() │
│ Scan → scan() │
└──────────────────────────────┘
Executor Threads ObjectFrontier (Arc<CountBudget>)
──────────────── │
│ │
Enumerate task ─try_acquire_ctx─▶│
│ ┌─────────┼─────────┐
│ │ │ │
├─ Success ──────▶ │ Spawn │ Deduct │
│ (Some) │ FetchSync│ quota │
│ │ │ │
├─ Failure ──────▶│ Re-enq │ Return │
│ (None) │ self │ later │
│ └─────────┼─────────┘
│
FetchSync task
│ (completes)
▼
Arc clone
│ (dropped)
│ (if last ref)
▼
ObjectCtx::drop()
│
▼
ObjectPermit::drop()
│
▼
CountBudget::release(1)
- ObjectFrontier: Backed by
Arc<CountBudget>(Mutex+Condvarbudget;try_acquireis non-blocking but lock-based) - ObjectRef cloning: Atomic refcount increment via
Arc::clone(no heap allocation) - Task enqueueing: Work-stealing deque operations in the executor
- Role: Anchor for all metadata about an in-flight object
- Fields:
descriptor: ObjectDescriptor- path, size hint, IDs_permit: ObjectPermit- RAII slot in frontier (not readable, just held)file_id: FileId- handle for scan engine
- Lifetime: Created when frontier permit acquired, dropped when last
Arc<ObjectCtx>drops - Cloning: Wrapped in
Arcfor cheap sharing across tasks
- Type:
Arc<ObjectCtx> - Role: Cheap shared reference to object state
- Size: 8 bytes (just a pointer)
- Clone cost: Atomic refcount increment (no allocation)
- Size: asserted
<= 128bytes in unit tests (task_size_is_reasonable) - Variants:
enum Task { Enumerate { source_id, cursor }, FetchSync { obj, offset }, Scan { obj, buffer, base_offset, len, prefix_len }, }
- Role: Typed representation of work unit
- Advantage: Introspectable for metrics; no heap indirection vs boxed closure
- Size: asserted
<= 64bytes in unit tests (object_descriptor_size_is_reasonable) - Role: Metadata about discovered object
- Fields:
path: PathBuf- filesystem or URI pathsize_hint: u64- discovered size (may differ from actual)object_id: ObjectId- run-scoped unique ID ({ source: SourceId(u32), idx: u32 })source_id: SourceId- which source this came from
- Role: Bounded in-flight object quota
- Backed by:
Arc<CountBudget>(Mutex+Condvarpermit budget) - API:
try_acquire_ctx()- non-blocking, returnsOption<ObjectRef>acquire()- blocking, use only from non-executor threadsin_flight()/capacity()/available()- metrics queries
- Role: Resumable enumeration state
- Variants:
Start- begin enumerationContinue(Box<CursorState>)- resume at this stateDone- enumeration complete
- CursorState (source-specific):
FsDir { dirs, entries }- filesystem traversal stack + batchOffset(u64)- byte offset for streaming sourcesToken(String)- pagination token for APIs (S3, etc.)
- Important: Does NOT implement Clone; moving cursor is explicit
- Role: Handle to buffer from thread-safe pool
- Lifetime: Owned by Scan task; returned to pool on drop
- Size: Capped at 4 MiB (
BUFFER_LEN_MAXfromengine_stub.rs)
- Role: Per-worker aggregate statistics
- Fields:
enumerate_count- total Enumerate tasks runobjects_discovered- count of objects handed to FetchSyncenumerate_backpressure- re-enqueues due to frontier fullfetch_sync_count- FetchSync tasks runbytes_fetched- total bytes read (includes overlap)scan_count- Scan tasks runbytes_scanned- payload bytes (excludes overlap prefix)objects_completed- objects with permit released
- Aggregation: Per-worker metrics merged after executor shutdown
| Operation | Behavior | Notes |
|---|---|---|
| Task dispatch (match variant + call) | Enum pattern match | No task boxing in the task graph model |
| ObjectRef clone | Atomic refcount increment | Cheap Arc::clone, no object metadata copy |
Frontier try_acquire |
Lock + immediate return | None at capacity, no blocking wait |
Frontier acquire |
Lock + condvar wait | Blocking API for non-executor producers |
| Frontier permit drop | RAII release on drop | Returns one slot to CountBudget |
Memory Layout:
- Task enum: asserted
<= 128bytes - ObjectDescriptor: asserted
<= 64bytes - ObjectRef: 8 bytes (just Arc pointer)
Statement: If the frontier is full, Enumerate will re-enqueue itself and the object will not be lost.
Proof:
- Frontier capacity is finite and known:
frontier.capacity() Enumerateusestry_acquire_ctx(), which returnsNonewhenin_flight >= capacity- On
None, Enumerate re-enqueues itself with updated cursor state - Updated cursor state preserves the batch being processed
- Eventually, permits are released (as objects complete) and
try_acquire_ctx()succeeds
Statement: Each object's frontier permit is released exactly once, never earlier and never twice.
Proof:
- Permit is created when
frontier.try_acquire_ctx()succeeds (once per object) - Permit is wrapped in
ObjectPermit, which is moved intoObjectCtx, not cloned ObjectCtxis wrapped inArc, shared across tasks- Permit lifetime is linked to
ObjectCtxlifetime via RAII ObjectCtxdrops only whenArc::strong_count()reaches 0 (once)ObjectPermit::drop()releases the permit exactly once
Statement: No executor thread running Enumerate will block waiting for frontier permits.
Proof:
Enumerateuses onlyfrontier.try_acquire_ctx(), neverfrontier.acquire()try_acquire_ctx()returns immediately:Some(ctx)orNone- If
None, Enumerate re-enqueues itself and returns (no wait) - Therefore, no executor thread blocks on frontier permits
Consequence: If all N executor threads are running, and all are in the Enumerate handler, each will re-enqueue and yield. One will eventually dequeue a non-Enumerate task.
The module includes focused unit tests for frontier semantics, permit lifetime, and size budgets:
-
Frontier Acquisition:
frontier_try_acquire_does_not_block()- Verifies
try_acquire()returnsNoneat capacity
- Verifies
-
Permit Lifetime:
object_ctx_releases_permit_on_drop()- Single object; drop ObjectCtx; verify permit released
-
Multi-Chunk Permit Lifetime:
multi_chunk_object_permit_lifetime()- Simulate FetchSync → FetchSync → Scan → Scan chain
- Verify permit released only after all tasks drop
-
Concurrent Completion:
concurrent_object_completion()- 10 threads acquiring 10 objects concurrently
- Verify all permits released when all objects done
-
Size Assertions:
- Task size ≤ 128 bytes
- ObjectDescriptor size ≤ 64 bytes
- Ensures cache efficiency
The task graph module defines a typed, state-machine-driven task model with these key features:
- Explicit State Machine: Enumerate → FetchSync → Scan → Completion
- Refcounted Permits:
Arc<ObjectCtx>ensures permit release exactly once - Work-Conserving Backpressure: Enumerate re-enqueues on frontier full, never drops work
- Non-Blocking Enumerate Path: Enumerate uses
try_acquire()/try_acquire_ctx();acquire()remains available for blocking producer contexts - Introspectable Tasks: Typed enum enables metrics and debugging
- Efficient Memory: ObjectRef is 8 bytes, tasks fit in 2 cache lines
- RAII Permit Safety: Frontier slots are released by drop semantics without manual bookkeeping
This design ensures memory safety, prevents deadlocks from blocking enumerate handlers, bounds resource consumption, and makes permit lifetime correctness automatic through RAII.