Skip to content

Commit 440fe0a

Browse files
committed
gitignore: ignore perf.data* profiling files
1 parent 9dd1fe1 commit 440fe0a

5 files changed

Lines changed: 112 additions & 90 deletions

File tree

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,6 @@ Cargo.lock
118118
# These are backup files generated by rustfmt
119119
**/*.rs.bk
120120

121-
.idea
121+
.idea
122+
# Ignore perf profiling data
123+
perf.data*

src/apps/hnsw/coordinator/mod.rs

Lines changed: 63 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,11 @@ impl Service for HNSWIndexService {
133133
)
134134
.await
135135
.map_err(|e| format!("Failed to create new job (RPC): {:?}", e))??;
136-
cell_client
137-
.next_iteration(false, job_id)
136+
// Advance from initial starting vertices using combined RPC
137+
let _ = cell_client
138+
.advance_iteration(job_id, Vec::new(), false)
138139
.await
139-
.map_err(|e| format!("Failed to navigate to closest (RPC): {:?}", e))??;
140+
.map_err(|e| format!("Failed to navigate to closest (RPC): {:?}", e))?;
140141
cell_client
141142
.index_cell(job_id, cell_id)
142143
.await
@@ -159,16 +160,30 @@ impl Service for HNSWIndexService {
159160
async move {
160161
self.broadcast_new_job(job_id, schema, field_id, query, 1, ef, ef, metric)
161162
.await?;
162-
self.broadcast_navigate_to_closest(job_id, max_iter as usize)
163+
let participants = self
164+
.broadcast_navigate_to_closest(job_id, max_iter as usize)
163165
.await?;
164-
let tops = broadcast_to_members(
165-
&self.conshash,
166-
move |partition_svr: Arc<PartitionSvrClient>| async move {
167-
partition_svr.top(job_id).await
168-
},
169-
)
170-
.await
171-
.map_err(|e| format!("Failed to get top result: {:?}", e))?;
166+
let tops = if participants.is_empty() {
167+
broadcast_to_members(
168+
&self.conshash,
169+
move |partition_svr: Arc<PartitionSvrClient>| async move {
170+
partition_svr.top(job_id).await
171+
},
172+
)
173+
.await
174+
.map_err(|e| format!("Failed to get top result: {:?}", e))?
175+
} else {
176+
let server_ids = participants.into_iter().sorted().collect_vec();
177+
broadcast_with_server_ids(
178+
server_ids.into_iter(),
179+
&self.conshash,
180+
move |partition_svr: Arc<PartitionSvrClient>| async move {
181+
partition_svr.top(job_id).await
182+
},
183+
)
184+
.await
185+
.map_err(|e| format!("Failed to get top result: {:?}", e))?
186+
};
172187
let top = tops
173188
.iter()
174189
.filter_map(|(sid, res)| {
@@ -206,16 +221,30 @@ impl Service for HNSWIndexService {
206221
async move {
207222
self.broadcast_new_job(job_id, schema, field_id, query, k as u64, ef, ef, metric)
208223
.await?;
209-
self.broadcast_navigate_to_closest(job_id, max_iter as usize)
224+
let participants = self
225+
.broadcast_navigate_to_closest(job_id, max_iter as usize)
210226
.await?;
211-
let tops = broadcast_to_members(
212-
&self.conshash,
213-
move |partition_svr: Arc<PartitionSvrClient>| async move {
214-
partition_svr.top_k(job_id, k as u32).await
215-
},
216-
)
217-
.await
218-
.map_err(|e| format!("Failed to get top result: {:?}", e))?;
227+
let tops = if participants.is_empty() {
228+
broadcast_to_members(
229+
&self.conshash,
230+
move |partition_svr: Arc<PartitionSvrClient>| async move {
231+
partition_svr.top_k(job_id, k as u32).await
232+
},
233+
)
234+
.await
235+
.map_err(|e| format!("Failed to get top result: {:?}", e))?
236+
} else {
237+
let server_ids = participants.into_iter().sorted().collect_vec();
238+
broadcast_with_server_ids(
239+
server_ids.into_iter(),
240+
&self.conshash,
241+
move |partition_svr: Arc<PartitionSvrClient>| async move {
242+
partition_svr.top_k(job_id, k as u32).await
243+
},
244+
)
245+
.await
246+
.map_err(|e| format!("Failed to get top result: {:?}", e))?
247+
};
219248
let top_k = tops
220249
.iter()
221250
.filter_map(|(sid, res)| {
@@ -382,14 +411,15 @@ impl HNSWIndexService {
382411
&self,
383412
job_id: JobId,
384413
max_iter: usize,
385-
) -> Result<(), String> {
414+
) -> Result<HashSet<u64>, String> {
415+
let mut participants: HashSet<u64> = HashSet::default();
386416
let mut new_frontiers = self.broadcast_all_next_iteration(job_id, false).await?;
387417
let mut iter = 1;
388418
while !new_frontiers.is_empty() {
389419
if iter >= max_iter {
390420
// reached max iterations, just stop there
391421
// TODO: maybe we should consider repartition the graph when this happens
392-
return Ok(());
422+
return Ok(participants);
393423
}
394424
iter += 1;
395425
let server_frontiers = new_frontiers
@@ -402,22 +432,17 @@ impl HNSWIndexService {
402432
.collect::<HashMap<_, _>>();
403433
let frontiers = Arc::new(server_frontiers);
404434
let server_ids = frontiers.keys().cloned().collect_vec();
435+
participants.extend(server_ids.iter().cloned());
405436
let broadcast_res = broadcast_with_server_ids(
406437
server_ids.into_iter(),
407438
&self.conshash,
408439
move |partition_svr: Arc<PartitionSvrClient>| {
409440
let frontiers = frontiers.clone();
410441
async move {
411442
let ids = frontiers.get(&partition_svr.server_id()).unwrap();
412-
let set_res = partition_svr.set_frontiers(job_id, ids.clone()).await?;
413-
if let Err(e) = &set_res {
414-
return Ok(Err(format!(
415-
"Failed to set frontiers on {}: {:?}",
416-
partition_svr.server_id(),
417-
e
418-
)));
419-
}
420-
let new_frontier = partition_svr.next_iteration(false, job_id).await?;
443+
let new_frontier = partition_svr
444+
.advance_iteration(job_id, ids.clone(), false)
445+
.await?;
421446
Ok(new_frontier)
422447
}
423448
},
@@ -445,7 +470,7 @@ impl HNSWIndexService {
445470
}
446471
}
447472
}
448-
Ok(())
473+
Ok(participants)
449474
}
450475

451476
async fn broadcast_next_iteration<I>(
@@ -457,12 +482,16 @@ impl HNSWIndexService {
457482
where
458483
I: Iterator<Item = u64>,
459484
{
485+
// With advance_iteration, there is no separate next_iteration without frontiers.
486+
// So we call it with empty frontiers to drive local iteration.
487+
let empty: Vec<Id> = Vec::new();
460488
let next_iteration_res = broadcast_with_server_ids(
461489
server_ids.into_iter(),
462490
&self.conshash,
463491
move |partition_svr: Arc<PartitionSvrClient>| {
464492
let job_id = job_id.clone();
465-
async move { partition_svr.next_iteration(readonly, job_id).await }
493+
let empty_clone = empty.clone();
494+
async move { partition_svr.advance_iteration(job_id, empty_clone, readonly).await }
466495
},
467496
)
468497
.await

src/apps/hnsw/partition/search.rs

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -454,28 +454,30 @@ impl HnswOnlinePartition {
454454
vec![]
455455
}
456456
Err(NeighbourhoodError::IdListError(IdListError::ContainerCellNotReady)) => {
457-
// If the container cell is not ready, we need to prepare the it
458-
append_job_log(
459-
logger,
460-
job_id,
461-
JobLogLevel::Info,
462-
format!("Container cell not ready for vertex {:?}, preparing it", id),
463-
);
464-
let field_id = ed.as_field();
465-
let _ = engine
466-
.graph_transaction(DefaultPartitioner, move |txn| {
467-
let mut list = IdList::from_txn_and_container(
468-
&txn.neb_txn,
469-
*id,
470-
field_id,
471-
schema_id,
472-
);
473-
async move {
474-
list.ensure_list().await;
475-
Ok(())
476-
}
477-
})
478-
.await;
457+
// Respect readonly: do not perform writes during query navigation
458+
if !readonly {
459+
append_job_log(
460+
logger,
461+
job_id,
462+
JobLogLevel::Info,
463+
format!("Container cell not ready for vertex {:?}, preparing it", id),
464+
);
465+
let field_id = ed.as_field();
466+
let _ = engine
467+
.graph_transaction(DefaultPartitioner, move |txn| {
468+
let mut list = IdList::from_txn_and_container(
469+
&txn.neb_txn,
470+
*id,
471+
field_id,
472+
schema_id,
473+
);
474+
async move {
475+
list.ensure_list().await;
476+
Ok(())
477+
}
478+
})
479+
.await;
480+
}
479481
vec![]
480482
}
481483
Err(e) => {

src/apps/hnsw/partition/service.rs

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ pub mod service {
4747
rpc end_job(job_id: JobId) -> bool;
4848

4949
// Then use multiple iterations (or not) to do the actual search
50-
// It returns remove frontiers to be processed by other partitions
51-
rpc next_iteration(readonly: bool, job_id: JobId) -> Result<HashSet<Id>, String>;
52-
rpc set_frontiers(job_id: JobId, frontiers: Vec<Id>) -> Result<(), String>;
50+
// It returns remote frontiers to be processed by other partitions
51+
// Combined RPC to reduce per-iteration round-trips: set frontiers and advance one iteration
52+
rpc advance_iteration(job_id: JobId, frontiers: Vec<Id>, readonly: bool) -> Result<HashSet<Id>, String>;
5353

5454
// After a search is completed, it can call following functions
5555
rpc index_cell(job_id: JobId, cell_id: Id) -> Result<(), String>;
@@ -158,34 +158,13 @@ impl service::Service for service::HNSWPartitionService {
158158
.boxed()
159159
}
160160

161-
fn set_frontiers<'a>(
161+
162+
163+
fn advance_iteration<'a>(
162164
&'a self,
163165
job_id: JobId,
164166
frontiers: Vec<Id>,
165-
) -> BoxFuture<'a, Result<(), String>> {
166-
async move {
167-
let job = match self
168-
.jobs
169-
.get(&job_id)
170-
.ok_or(format!("Job {:?} not found", job_id))
171-
{
172-
Ok(job) => job,
173-
Err(msg) => {
174-
append_job_log(&self.job_logger, job_id, JobLogLevel::Error, msg.clone());
175-
return Err(msg);
176-
}
177-
};
178-
let mut job = job.lock().await;
179-
job.frontier = frontiers;
180-
Ok(())
181-
}
182-
.boxed()
183-
}
184-
185-
fn next_iteration<'a>(
186-
&'a self,
187167
readonly: bool,
188-
job_id: JobId,
189168
) -> BoxFuture<'a, Result<HashSet<Id>, String>> {
190169
async move {
191170
let job = match self
@@ -199,6 +178,14 @@ impl service::Service for service::HNSWPartitionService {
199178
return Err(msg);
200179
}
201180
};
181+
// Set incoming frontiers if provided; otherwise preserve current frontier
182+
{
183+
let mut job = job.lock().await;
184+
if !frontiers.is_empty() {
185+
job.frontier = frontiers;
186+
}
187+
}
188+
// Now run next_iteration with the provided readonly flag
202189
let mut job = job.lock().await;
203190
let metric = Metric::from_encoding(job.metric);
204191
let logger = &self.job_logger;
@@ -240,6 +227,8 @@ impl service::Service for service::HNSWPartitionService {
240227
.boxed()
241228
}
242229

230+
231+
243232
fn index_cell<'a>(&'a self, job_id: JobId, cell_id: Id) -> BoxFuture<'a, Result<(), String>> {
244233
async move {
245234
self.new_vertex(job_id, cell_id)

src/apps/hnsw/partition/tests.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -235,8 +235,8 @@ impl TestEnvironment {
235235
)
236236
.await
237237
.map_err(|e| format!("Failed to create job: {:?}", e))?;
238-
partition_svr
239-
.next_iteration(false, job_id)
238+
let _ = partition_svr
239+
.advance_iteration(job_id, Vec::new(), false)
240240
.await
241241
.map_err(|e| format!("Failed to run job with next iteration: {:?}", e))?;
242242
partition_svr
@@ -246,7 +246,7 @@ impl TestEnvironment {
246246
// let mut search = partition.new_search(CELL_SCHEMA_ID, VECTOR_FIELD_ID, &vector, 1, MetricEncoding::L2)
247247
// .await
248248
// .map_err(|e| format!("Failed to create search: {:?}", e))?;
249-
// partition.next_iteration(&mut search, L2Metric, logger, job_id).await.map_err(|e| format!("Failed to run search iteration: {:?}", e))?;
249+
// (legacy) partition.next_iteration(&mut search, ...)
250250
// partition.new_vertex(job_id, cell_id, &vector, CELL_SCHEMA_ID, VECTOR_FIELD_ID, L2Metric, SearchMetadata::default(), 3, logger).await.map_err(|e| format!("Failed to create vertex: {:?}", e))?;
251251
}
252252
Ok(())
@@ -276,16 +276,16 @@ impl TestEnvironment {
276276
)
277277
.await
278278
.map_err(|e| format!("Failed to create job: {:?}", e))?;
279-
partition
280-
.next_iteration(false, job_id)
279+
let _ = partition
280+
.advance_iteration(job_id, Vec::new(), false)
281281
.await
282282
.map_err(|e| format!("Failed to run job with next iteration: {:?}", e))?;
283283
let results = partition
284284
.top_k(job_id, k as u32)
285285
.await
286286
.map_err(|e| format!("Failed to search: {:?}", e))?;
287287
let duration = start.elapsed();
288-
println!("Search took {:?} microseconds", duration.as_micros());
288+
println!("Search took {:?} ms", duration.as_millis());
289289
Ok(results)
290290
}
291291

0 commit comments

Comments
 (0)