Skip to content

Commit 7aac869

Browse files
committed
Cap outbound batches by message_batch_size
1 parent aecadb1 commit 7aac869

2 files changed

Lines changed: 97 additions & 4 deletions

File tree

src/tests/compute/mod.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,65 @@ async fn test_compute_worker_scatter_and_apply() {
268268
assert_eq!(state_v3, 1);
269269
}
270270

271+
#[tokio::test]
272+
async fn test_compute_worker_splits_outbound_batches_by_message_batch_size() {
273+
let program = Arc::new(TestProgram::new());
274+
let store = InMemoryStateStore::<i64, i64>::new();
275+
276+
let mut adjacency = LocalAdjacency::new();
277+
let source = Id::new(1, 11);
278+
let neighbors = [Id::new(1, 12), Id::new(1, 13), Id::new(1, 14), Id::new(1, 15), Id::new(1, 16)];
279+
for neighbor in neighbors {
280+
adjacency.add_edge(source, neighbor);
281+
}
282+
283+
let transport = Arc::new(LocalTransport::new());
284+
285+
let mut worker = ComputeWorker::new(
286+
1,
287+
JobId {
288+
coordinator_server_id: 1,
289+
coordinator_job_id: 11,
290+
},
291+
program,
292+
store,
293+
adjacency,
294+
transport,
295+
AdjacencyConfig {
296+
edge_schemas: vec![],
297+
direction: Direction::Outbound,
298+
type_list: None,
299+
weight: WeightSpec::Hops,
300+
},
301+
JobOptions {
302+
max_active_per_step: usize::MAX,
303+
message_batch_size: 2,
304+
},
305+
);
306+
307+
let seed_msg = worker.program.codec().msg_encode(&1);
308+
worker
309+
.deliver_messages(vec![(source, seed_msg)])
310+
.expect("deliver seed");
311+
312+
let output = worker
313+
.run_superstep(0, &GlobalStepParams { step: 0 })
314+
.expect("step 0");
315+
316+
assert_eq!(output.report.messages_sent, 5);
317+
assert_eq!(output.report.batches_sent, 3);
318+
assert_eq!(output.outbound.len(), 3);
319+
assert!(output.outbound.iter().all(|batch| batch.server_id == 1));
320+
assert_eq!(
321+
output
322+
.outbound
323+
.iter()
324+
.map(|batch| batch.messages.len())
325+
.collect::<Vec<_>>(),
326+
vec![2, 2, 1]
327+
);
328+
}
329+
271330
#[test]
272331
fn test_vertex_set_manifest_roundtrip() {
273332
let shard = VertexSetShard {

src/traversal/compute/engine.rs

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,14 @@ use serde::{Deserialize, Serialize};
1414

1515
#[derive(Debug, Clone, Serialize, Deserialize)]
1616
pub struct JobOptions {
17+
/// Maximum number of active vertices processed in a single superstep.
1718
pub max_active_per_step: usize,
19+
/// Maximum number of messages carried by a single `OutboundBatch` for one
20+
/// destination server.
21+
///
22+
/// When a server receives more emitted messages than this in one
23+
/// superstep, the worker splits them into multiple `OutboundBatch`
24+
/// records. Values less than 1 are clamped to 1 by the worker.
1825
pub message_batch_size: usize,
1926
}
2027

@@ -37,6 +44,8 @@ pub struct SuperstepReport {
3744
pub active_vertices: usize,
3845
pub messages_sent: usize,
3946
pub bytes_sent: usize,
47+
/// Number of outbound `OutboundBatch` chunks produced after grouping by
48+
/// destination server and splitting by `JobOptions::message_batch_size`.
4049
pub batches_sent: usize,
4150
pub misrouted: usize,
4251
}
@@ -50,6 +59,11 @@ pub struct DeliverReport {
5059
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
5160
pub struct SuperstepOutput {
5261
pub report: SuperstepReport,
62+
/// Outbound message chunks ready for delivery.
63+
///
64+
/// There may be multiple entries for the same `server_id` when one
65+
/// superstep emits more than `JobOptions::message_batch_size` messages to
66+
/// that destination server.
5367
pub outbound: Vec<OutboundBatch>,
5468
}
5569

@@ -497,16 +511,36 @@ where
497511
.collect();
498512
self.retry_adjacency.extend(reduced.retry_adjacency.iter().copied());
499513

514+
let batch_size = self.options.message_batch_size.max(1);
500515
let outbound: Vec<OutboundBatch> = reduced
501516
.outbound
502517
.into_iter()
503-
.map(|(server_id, messages)| {
518+
.flat_map(|(server_id, messages)| {
504519
report.messages_sent += messages.len();
505520
report.bytes_sent += messages.iter().map(|(_, m)| m.len()).sum::<usize>();
506-
OutboundBatch {
507-
server_id,
508-
messages,
521+
522+
let mut batches = Vec::new();
523+
let mut current = Vec::with_capacity(batch_size.min(messages.len()));
524+
525+
for message in messages {
526+
current.push(message);
527+
if current.len() == batch_size {
528+
batches.push(OutboundBatch {
529+
server_id,
530+
messages: current,
531+
});
532+
current = Vec::with_capacity(batch_size);
533+
}
509534
}
535+
536+
if !current.is_empty() {
537+
batches.push(OutboundBatch {
538+
server_id,
539+
messages: current,
540+
});
541+
}
542+
543+
batches
510544
})
511545
.collect();
512546
report.batches_sent = outbound.len();

0 commit comments

Comments
 (0)