Skip to content

Commit cd5a714

Browse files
committed
Squashed commit of the following:
commit 14f92b8 Author: Dmitrii Kostyrev <dkostyrev@joom.com> Date: Sun Jan 25 12:07:20 2026 +0000 Introduce batchInterval and batchDebounce. commit 12d839c Author: Dmitrii Kostyrev <dkostyrev@joom.com> Date: Sun Jan 25 12:06:45 2026 +0000 Introduce batch notify and assign actions. commit 7393a00 Author: Dmitrii Kostyrev <dkostyrev@joom.com> Date: Sun Jan 25 12:06:12 2026 +0000 Use RWLock instead of single Mutext in MemoryAwaitedActionDb. commit 8cfeade Author: Dmitrii Kostyrev <dkostyrev@joom.com> Date: Sun Jan 25 12:05:37 2026 +0000 Fix batch matching by allowing same worker accept multiple jobs. commit 7cd29c9 Author: Dmitrii Kostyrev <dkostyrev@joom.com> Date: Fri Jan 23 12:15:40 2026 +0000 Introduce batch worker matching.
1 parent 2299cd1 commit cd5a714

7 files changed

Lines changed: 550 additions & 52 deletions

File tree

nativelink-config/src/schedulers.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,18 @@ const fn default_worker_match_logging_interval_s() -> i64 {
7777
10
7878
}
7979

80+
/// Default batch interval in milliseconds (100ms).
81+
/// This is the maximum time between batch matching cycles.
82+
const fn default_batch_interval_ms() -> u64 {
83+
100
84+
}
85+
86+
/// Default debounce window in milliseconds (20ms).
87+
/// After a trigger, wait this long to collect more changes before running.
88+
const fn default_batch_debounce_ms() -> u64 {
89+
20
90+
}
91+
8092
#[derive(Deserialize, Serialize, Debug, Default)]
8193
#[serde(deny_unknown_fields)]
8294
pub struct SimpleSpec {
@@ -150,6 +162,38 @@ pub struct SimpleSpec {
150162
deserialize_with = "convert_duration_with_shellexpand_and_negative"
151163
)]
152164
pub worker_match_logging_interval_s: i64,
165+
166+
/// Enable batch worker matching optimization.
167+
/// When enabled, the scheduler will collect queued actions and match them
168+
/// to workers in a single batch operation, reducing lock contention.
169+
/// This can significantly improve throughput when there are many queued
170+
/// actions and workers.
171+
/// Default: false
172+
#[serde(default)]
173+
pub enable_batch_worker_matching: bool,
174+
175+
/// Maximum interval between batch matching cycles (milliseconds).
176+
/// Even without triggers, matching runs at least this often.
177+
/// Only used when `enable_batch_worker_matching` is true.
178+
/// Default: 100ms
179+
#[serde(
180+
default = "default_batch_interval_ms",
181+
deserialize_with = "convert_numeric_with_shellexpand"
182+
)]
183+
pub batch_interval_ms: u64,
184+
185+
/// Debounce window after first trigger (milliseconds).
186+
/// When a task or worker change notification is received, wait this long
187+
/// to collect additional changes before running batch match.
188+
/// This improves batching efficiency under bursty load.
189+
/// 0 = immediate (no debounce).
190+
/// Only used when `enable_batch_worker_matching` is true.
191+
/// Default: 20ms
192+
#[serde(
193+
default = "default_batch_debounce_ms",
194+
deserialize_with = "convert_numeric_with_shellexpand"
195+
)]
196+
pub batch_debounce_ms: u64,
153197
}
154198

155199
#[derive(Deserialize, Serialize, Debug)]

nativelink-metric/src/lib.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,18 @@ impl<T: MetricsComponent> MetricsComponent for async_lock::Mutex<T> {
458458
}
459459
}
460460

461+
impl<T: MetricsComponent> MetricsComponent for async_lock::RwLock<T> {
462+
fn publish(
463+
&self,
464+
kind: MetricKind,
465+
field_metadata: MetricFieldData,
466+
) -> Result<MetricPublishKnownKindData, Error> {
467+
// It is safe to block in the publishing thread.
468+
let lock = self.read_blocking();
469+
lock.publish(kind, field_metadata)
470+
}
471+
}
472+
461473
impl<T: MetricsComponent> MetricsComponent for parking_lot::Mutex<T> {
462474
fn publish(
463475
&self,

0 commit comments

Comments
 (0)