The GlobalResourcePool module provides centralized resource management for "fat" jobs in the scheduler: operations that require significant memory allocations beyond per-object tracking. This includes Git repository scanning, archive extraction, and multi-file container operations.
Current status: this module is implemented and tested in crates/scanner-scheduler/src/scheduler/global_resource_pool.rs, but there are no non-test call sites in the scheduler yet.
The Global Resource Pool enforces system-wide memory limits on concurrent "fat" jobs by maintaining shared budgets for:
- Scan Ring Memory: Fixed-size buffers used during repository/archive traversal (10-50 MB typical per job)
- Delta Cache Memory: Decompression and delta resolution caches (25-100 MB typical per job)
- Spill Slots: Concurrent disk-spilling operations (limited or unlimited based on configuration)
- Git Repository Scanning: Requires scan ring for commit graph traversal + delta cache for pack file resolution
- Archive Extraction: Requires scan ring for decompression buffers + spill slots if temp files needed
- Multi-File Container Scanning: ZIP/TAR operations with multiple concurrent file extractions
Resources are managed via FatJobPermit—an RAII guard that automatically releases all held resources when dropped, ensuring no leaks even during job cancellation or error paths.
Per-object budgets (ObjectFrontier) track memory used per individual object, but don't prevent pathological scenarios:
Scenario: N concurrent repo jobs with B+D bytes each
Per-object limit: 500 MB per repo
Global available: 1 GB
Without global pool:
- Job 1 acquires 500 MB (limit met for this object)
- Job 2 acquires 500 MB (different object, limit met separately)
- Job 3 acquires 500 MB (different object, limit met separately)
Result: 1.5 GB allocated, but only 1 GB system memory available!
The Global Resource Pool enforces a hard cap at the scheduler level:
Global Pool Configuration:
scan_ring_total = 200 MB (shared across ALL jobs)
delta_cache_total = 400 MB (shared across ALL jobs)
Concurrent Jobs:
- Job 1: acquires 50 MB scan ring + 100 MB delta → permitted
- Job 2: acquires 50 MB scan ring + 100 MB delta → permitted
- Job 3: acquires 50 MB scan ring + 100 MB delta → permitted
- Job 4: tries 50 MB scan ring → denied (only 50 MB remaining)
→ re-enqueued by scheduler
- All-or-Nothing Acquisition: Partial permits are impossible; if any resource is unavailable, the entire request fails and previous acquisitions are rolled back
- No Deadlock: Consistent acquisition order (scan ring → delta cache → spill slot) prevents circular waits
- Leak-Free: Drop implementation releases all resources RAII-style
- Bounded Total:
sum(all active permits' bytes) ≤ configured limits
| Aspect | Value | Notes |
|---|---|---|
| Scan Ring | 10-50 MB | Depends on commit graph depth and complexity |
| Delta Cache | 25-100 MB | Pack file resolution; larger repos need more |
| Spill Permission | Usually true |
Candidate buffer may exceed memory, spill to disk |
| Fairness | Best effort | No priority queue; backoff recommended for large jobs |
Configuration Example:
let config = GlobalResourcePoolConfig {
scan_ring_bytes: 200_000_000, // 200 MB for ~4 concurrent repos
delta_cache_bytes: 400_000_000, // 400 MB for complex packs
spill_slots: Some(8), // 8 concurrent spill ops
};| Aspect | Value | Notes |
|---|---|---|
| Scan Ring | File size | Size of largest file in archive |
| Delta Cache | 0 | No delta resolution needed |
| Spill Permission | Depends on job | Only if extraction may exceed memory |
Zip note: current ZIP scanning is file-backed random access and reuses per-worker buffers; it does not yet acquire a fat-job permit unless spill materialization is introduced for nested archives.
Configuration Example:
// For archive processing
let request = FatJobRequest::archive(
10 * 1024 * 1024, // 10 MB max file size
true // May spill to disk
);| Aspect | Value | Notes |
|---|---|---|
| Scan Ring | Largest file size | Or sum if simultaneous extraction |
| Delta Cache | 0 | Typically none for containers |
| Spill Permission | true |
Temp files for decompression |
The module does not implement job type prioritization. Instead:
- All job types compete fairly for the shared pool
- Scheduler-level backoff should handle fairness (e.g., retry large jobs with exponential backoff if they keep failing)
- This keeps the resource pool simple and prevents priority inversion bugs
let permit = pool.try_acquire_fat_job_permit(
FatJobRequest {
scan_ring_bytes: 50_000_000,
delta_cache_bytes: 100_000_000,
needs_spill_slot: true,
}
);
match permit {
Some(p) => {
// ALL resources acquired: 50MB scan ring, 100MB delta, 1 spill slot
// Ready to proceed with job
}
None => {
// NOTHING acquired; pool state unchanged
// Re-enqueue and retry later
}
}Resources are acquired in a fixed sequence to prevent deadlock:
- Scan ring bytes - acquired first
- Delta cache bytes - acquired second (if fails, scan ring is released)
- Spill slot - acquired third (if fails, both byte budgets are released)
This ordering ensures that if step N fails, steps 1 through N-1 are automatically rolled back.
impl Drop for FatJobPermit {
fn drop(&mut self) {
// Release scan ring bytes
self.pool.release_scan_ring(self.scan_ring_bytes);
// Release delta cache bytes
self.pool.release_delta_cache(self.delta_cache_bytes);
// Spill slot released automatically by CountPermit Drop
}
}struct GitRepoJob {
permit: FatJobPermit, // Held for entire job lifetime
repo_path: PathBuf,
// ... other state ...
}
impl GitRepoJob {
fn execute(&mut self) -> Result<()> {
// Use permit to constrain resource usage
let allow_spill = self.permit.can_spill();
scan_repository(&self.repo_path, allow_spill)?;
// On return or error, permit is automatically dropped
Ok(())
}
}Jobs cancelled mid-execution simply drop their permit:
async fn run_job(job: GitRepoJob) {
// If cancelled here, permit is dropped automatically
// No manual cleanup needed
job.execute().await
}The spill permission uses an enum to distinguish three cases:
enum SpillGrant {
NotRequested, // Job didn't request spill
Unlimited, // Spill enabled; no slot counting
Limited { _permit: CountPermit }, // Spill enabled; using 1 counted slot
}Usage:
permit.can_spill() // true if Unlimited or Limited
permit.spill_is_limited() // true only if LimitedScenarios:
| Config | Request | Result | Semantics |
|---|---|---|---|
spill_slots: Some(4) |
needs_spill_slot: true |
Limited |
Must acquire 1 of 4 slots |
spill_slots: Some(4) |
needs_spill_slot: false |
NotRequested |
Spill disallowed |
spill_slots: None |
needs_spill_slot: true |
Unlimited |
Spill allowed; no counting |
spill_slots: None |
needs_spill_slot: false |
NotRequested |
Spill disallowed |
┌─────────────────────────────────────────┐
│ Scheduler Main Loop │
│ │
│ 1. Dequeue job from work queue │
│ 2. Determine job type & resource needs │
│ 3. Try acquire global permit │
└────────────────┬────────────────────────┘
│
┌────────▼──────────────┐
│ Permit acquired? │
└─────┬────────┬────────┘
Yes │ │ No
│ │
┌────────▼──┐ ┌──▼────────────────┐
│ Start job │ │ Enqueue backoff │
│ Hold │ │ Re-try later with │
│ permit │ │ exponential delay │
└─────┬─────┘ └──────────────────┘
│
│ Job executes
│ (may hit ObjectFrontier limits too)
│
┌─────▼──────────────────┐
│ Job completes/error │
│ Permit dropped → │
│ Auto-release resources │
└────────────────────────┘
When wired into a scheduler path, the module provides negative feedback to the scheduler:
- Job Acquisition Fails →
try_acquire_fat_job_permit()returnsNone - Scheduler Response → Re-enqueue job with backoff delay
- Why Backoff? → Prevents busy-spinning; allows other jobs to release resources
// Pseudocode: scheduler's dequeue loop
loop {
let job = queue.pop()?;
loop {
// Attempt acquisition
match pool.try_acquire_fat_job_permit(job.resource_request()) {
Some(permit) => {
job.permit = permit;
execute_job(job);
break; // Job done, move to next
}
None => {
// Resources exhausted: backoff strategy
let delay = calculate_backoff(job.failures);
queue.push_delayed(job, delay);
break; // Try next job in queue
}
}
}
}The Global Resource Pool and ObjectFrontier operate at different levels:
Scheduler Flow:
Global Resource Pool (this module)
↓ hard cap on total memory across all jobs
Job Execution Starts
↓
ObjectFrontier (per-object concurrency control)
↓ caps concurrent blob fetches per object
↓ tracks memory per object
Individual Blob Processing
Key Insight: Both limits can trigger backpressure:
- Global pool full -> job re-enqueued (intended integration behavior)
- ObjectFrontier full -> job waits for blob completions, then acquires next
Purpose: Immutable configuration for pool creation.
pub struct GlobalResourcePoolConfig {
pub scan_ring_bytes: u64, // Total bytes for scan rings
pub delta_cache_bytes: u64, // Total bytes for delta caches
pub spill_slots: Option<usize>,// Concurrent spill ops (None = unlimited)
}Default Values:
scan_ring_bytes: 256 MBdelta_cache_bytes: 512 MBspill_slots: Some(16)
Methods:
validate()- Panics if config is invalid (zero budgets, etc.)
Example:
let config = GlobalResourcePoolConfig {
scan_ring_bytes: 200 * 1024 * 1024,
delta_cache_bytes: 400 * 1024 * 1024,
spill_slots: Some(8),
};
config.validate();Purpose: Main resource manager; thread-safe via Arc<>.
Thread Safety: Safe to share via Arc<GlobalResourcePool>; all internal state uses atomic operations.
Key Methods:
Creates a new pool from configuration. Panics if config is invalid.
let pool = GlobalResourcePool::new(config);
// Returns Arc<GlobalResourcePool>Attempts to acquire all requested resources. Returns None if any resource is unavailable; previously acquired resources are released.
Acquisition order:
- Scan ring bytes
- Delta cache bytes
- Spill slot (if requested)
let permit = pool.try_acquire_fat_job_permit(
FatJobRequest::git_repo(50, 100, true)
);
match permit {
Some(p) => { /* resources acquired */ },
None => { /* backoff and retry */ },
}scan_ring_available() -> u64- Available scan ring bytesscan_ring_total() -> u64- Total scan ring capacitydelta_cache_available() -> u64- Available delta cache bytesdelta_cache_total() -> u64- Total delta cache capacityspill_slots_available() -> Option<usize>- Available spill slots (None if unlimited)spill_slots_total() -> Option<usize>- Total spill slots (None if unlimited)
Usage:
let avail_scan = pool.scan_ring_available();
let total_delta = pool.delta_cache_total();
let spill_capacity = pool.spill_slots_available();
if avail_scan < required_scan {
// Not enough resources; skip this job
}Purpose: Describes resource requirements for a single job.
pub struct FatJobRequest {
pub scan_ring_bytes: u64,
pub delta_cache_bytes: u64,
pub needs_spill_slot: bool,
}Helper Constructors:
Creates a request for Git repository scanning. Sizes in megabytes.
let request = FatJobRequest::git_repo(50, 100, true);
// 50 MB scan ring, 100 MB delta cache, spill enabledPanics if MB values would overflow when converted to bytes.
Creates a request for archive extraction.
let request = FatJobRequest::archive(10 * 1024 * 1024, true);
// 10 MB scan ring, no delta cache, spill enabledPurpose: RAII guard holding acquired resources; automatically releases on drop.
Attributes:
#[must_use]- Compiler warns if not used (prevents accidental resource leaks)
Key Methods:
Returns the scan ring bytes held by this permit.
let bytes = permit.scan_ring_bytes();
assert_eq!(bytes, 50_000_000);Returns the delta cache bytes held by this permit.
let bytes = permit.delta_cache_bytes();
assert_eq!(bytes, 100_000_000);Returns sum of scan ring and delta cache bytes. Panics in debug builds on overflow.
let total = permit.total_bytes();Returns true if the permit allows spilling:
trueif spill slot was acquired (limited) OR spilling is unlimitedfalseif spill was not requested
if permit.can_spill() {
enable_disk_spilling();
}Returns true only if spill slot is from a counted budget.
if permit.spill_is_limited() {
log!("Spill is limited by configured slots");
}Drop Behavior:
When permit is dropped:
- Scan ring bytes are released to the pool
- Delta cache bytes are released to the pool
- Spill slot is released (if any) via CountPermit Drop
- Debug assertions check for double-release or corruption
| Operation | Implementation | Cost |
|---|---|---|
try_acquire_fat_job_permit() |
2 lock-free atomic CAS operations (byte budgets) + optional Mutex-guarded spill slot check | O(1), constant-time |
| Release via Drop | 2 byte-budget releases + optional spill-slot release | O(1), constant-time |
- Job-level backpressure: Designed for tens of acquisitions per second (job-level), not objects per second
- Atomic contention: Low contention because acquisitions are job-scoped, not per-object
- No allocation: FatJobPermit is stack-allocated (size guard: <= 80 bytes in tests)
FatJobPermitsize: platform-dependent; unit test asserts it stays <= 80 bytesGlobalResourcePoolsize: platform-dependent (three Arc-backed budgets)- Budget object sizes are implementation-dependent; avoid hard-coding byte counts
Large jobs may be starved by streams of small jobs:
Example: 200 MB pool, large job needs 120 MB
- Small job A acquires 80 MB
- Small job B acquires 80 MB → fails (only 40 MB left)
- Small job B acquires 40 MB → succeeds
Large job keeps failing while small jobs complete and restart.
Mitigation: Scheduler should implement exponential backoff for failed large-job acquisitions (e.g., retry after 10ms → 20ms → 40ms).
Jobs that fail to acquire resources must re-enqueue themselves:
// No built-in waiting
match pool.try_acquire_fat_job_permit(request) {
Some(permit) => { /* run job */ },
None => {
// Job must re-enqueue itself
scheduler.enqueue_delayed(job, backoff_delay);
}
}This is consistent with ObjectFrontier design: jobs are re-enqueuable units.
Pool configuration is set at creation time and cannot be changed. To adjust limits, the pool must be recreated and active permits must be released first.
The spill_slots limit is a soft count; actual disk operations may exceed the slot count if job code doesn't respect can_spill().
The module includes comprehensive tests covering:
basic_acquisition_and_release- Acquire and release flowempty_request_succeeds- No-op requests work
global_pool_prevents_over_commit- Pool prevents exceeding limitspartial_acquisition_releases_all- Failed acquisitions don't leakspill_slot_failure_releases_bytes- Spill failures rollback bytes
spill_only_request_acquires_slot- Spill-only requests acquire slotsunlimited_spill_mode- Unlimited spill configuration works
concurrent_acquisition- 10 threads acquire/release concurrently; pool fully restored
git_repo_helper-FatJobRequest::git_repo()builds correct requestarchive_helper-FatJobRequest::archive()builds correct requestpermit_size_is_reasonable- guardsFatJobPermitsize ceiling
- Config validation rejects zero scan ring, delta cache, spill slots
git_repo()helper rejects MB overflow
let config = GlobalResourcePoolConfig::default();
// 256 MB scan ring, 512 MB delta cache, 16 spill slotslet config = GlobalResourcePoolConfig {
scan_ring_bytes: 100 * 1024 * 1024, // 100 MB
delta_cache_bytes: 200 * 1024 * 1024, // 200 MB
spill_slots: Some(4),
};let config = GlobalResourcePoolConfig {
scan_ring_bytes: 1024 * 1024 * 1024, // 1 GB
delta_cache_bytes: 2048 * 1024 * 1024, // 2 GB
spill_slots: Some(32),
};let config = GlobalResourcePoolConfig {
scan_ring_bytes: 200 * 1024 * 1024,
delta_cache_bytes: 400 * 1024 * 1024,
spill_slots: None, // Unlimited spilling
};ByteBudget(crates/scanner-scheduler/src/scheduler/budget.rs) - Per-resource byte budget with atomic trackingCountBudget(crates/scanner-scheduler/src/scheduler/count_budget.rs) - Counted slot budget (for spill concurrency)ObjectFrontier(crates/scanner-scheduler/src/scheduler/task_graph.rs) - Per-object concurrency and memory limits- Scheduler resource-control modules (
crates/scanner-scheduler/src/scheduler/) - module export is in place; runtime wiring is still pending