Skip to content

Commit 1030881

Browse files
committed
fix: preserve HNSW search quality after recovery
Track top candidates independently of the history buffer and flush edge cache on shutdown so recovered indices keep high-recall neighbor data.
1 parent e8d61db commit 1030881

3 files changed

Lines changed: 133 additions & 72 deletions

File tree

src/apps/hnsw/partition/search.rs

Lines changed: 108 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,9 @@ impl HnswOnlinePartition {
357357
logger: &Arc<JobLogger>,
358358
job_id: JobId,
359359
) -> Option<Arc<OwnedPrimArray>> {
360+
static LOG_REMAINING: std::sync::atomic::AtomicUsize =
361+
std::sync::atomic::AtomicUsize::new(20);
362+
360363
// Check if this is the query vector (special case)
361364
if vertex_id.is_unit_id() {
362365
return metadata.get_query_vector(field_id);
@@ -369,14 +372,24 @@ impl HnswOnlinePartition {
369372

370373
// Cache miss - try to get cell_id from edge cache first (avoids storage I/O)
371374
// This optimization reduces vector lookup from 2 I/Os to 1 I/O when edge cache is warm
372-
let cell_id = if let Some(cached_cell_id) = self.edge_cache.get_cached_cell_id(*vertex_id) {
373-
cached_cell_id
375+
let (cell_id, source) = if let Some(cached_cell_id) =
376+
self.edge_cache.get_cached_cell_id(*vertex_id)
377+
{
378+
(cached_cell_id, "cache")
374379
} else {
375380
// Edge cache miss - fall back to storage read for cell_id
376381
let (cid, _) = self.get_vertex(vertex_id, chunks, logger, metadata, job_id)?;
377-
cid
382+
(cid, "storage")
378383
};
379384

385+
if LOG_REMAINING.load(std::sync::atomic::Ordering::Relaxed) > 0 {
386+
LOG_REMAINING.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
387+
info!(
388+
"HNSW vertex mapping: vertex={:?} -> cell={:?} (source={})",
389+
vertex_id, cell_id, source
390+
);
391+
}
392+
380393
// Read vector from data cell
381394
let cell = match chunks.read_selected(&cell_id, &[field_id], false) {
382395
Ok(cell) => cell,
@@ -398,6 +411,26 @@ impl HnswOnlinePartition {
398411
match field {
399412
SharedValue::PrimArray(vector) => {
400413
let vector = vector.copy_into_owned();
414+
if LOG_REMAINING.load(std::sync::atomic::Ordering::Relaxed) > 0 {
415+
LOG_REMAINING.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
416+
match &vector {
417+
OwnedPrimArray::F32(values) => {
418+
let preview: Vec<f32> = values.iter().take(3).copied().collect();
419+
info!(
420+
"HNSW vertex vector: vertex={:?}, len={}, preview={:?}",
421+
vertex_id,
422+
values.len(),
423+
preview
424+
);
425+
}
426+
_ => {
427+
info!(
428+
"HNSW vertex vector: vertex={:?}, non-f32 array",
429+
vertex_id
430+
);
431+
}
432+
}
433+
}
401434
let arc_vector = Arc::new(vector);
402435
// Cache in shared cache only (Moka handles eviction automatically)
403436
self.shared_vertex_vector_cache
@@ -437,11 +470,38 @@ impl HnswOnlinePartition {
437470
}
438471

439472
// Cache miss - compute distance
440-
let va_array =
441-
self.get_vertex_vector(vertex_a, field_id, metadata, chunks, logger, job_id)?;
442-
let vb_array =
443-
self.get_vertex_vector(vertex_b, field_id, metadata, chunks, logger, job_id)?;
444-
let distance = metric_distance(metric, &va_array, &vb_array)?;
473+
let va_array = match self.get_vertex_vector(vertex_a, field_id, metadata, chunks, logger, job_id)
474+
{
475+
Some(array) => array,
476+
None => {
477+
info!(
478+
"HNSW distance: missing vector for vertex={:?}, field={}",
479+
vertex_a, field_id
480+
);
481+
return None;
482+
}
483+
};
484+
let vb_array = match self.get_vertex_vector(vertex_b, field_id, metadata, chunks, logger, job_id)
485+
{
486+
Some(array) => array,
487+
None => {
488+
info!(
489+
"HNSW distance: missing vector for vertex={:?}, field={}",
490+
vertex_b, field_id
491+
);
492+
return None;
493+
}
494+
};
495+
let distance = match metric_distance(metric, &va_array, &vb_array) {
496+
Some(dist) => dist,
497+
None => {
498+
info!(
499+
"HNSW distance: metric failed for vertices {:?} -> {:?}, field={}",
500+
vertex_a, vertex_b, field_id
501+
);
502+
return None;
503+
}
504+
};
445505

446506
// Store in per-search cache
447507
metadata
@@ -609,12 +669,12 @@ impl HnswOnlinePartition {
609669
};
610670
temp_metadata.set_query_vector(query.clone(), field_id);
611671

612-
// For large indices, sample up to 50 entry points to balance quality vs performance
613-
let vertices_to_check: Vec<Id> = if starting_vertices.len() > 50 {
614-
starting_vertices.iter().take(50).cloned().collect()
615-
} else {
616-
starting_vertices.clone()
617-
};
672+
// For large indices, sample up to 50 entry points to balance quality vs performance
673+
let vertices_to_check: Vec<Id> = if starting_vertices.len() > 50 {
674+
starting_vertices.iter().take(50).cloned().collect()
675+
} else {
676+
starting_vertices.clone()
677+
};
618678

619679
for vertex_id in &vertices_to_check {
620680
let distance_opt = self.get_vertex_distance(
@@ -630,6 +690,7 @@ impl HnswOnlinePartition {
630690
if let Some(distance) = distance_opt {
631691
entry_distances.push((*vertex_id, distance));
632692
}
693+
633694
}
634695

635696
// Sort by distance and take the closest entry point(s)
@@ -641,6 +702,7 @@ impl HnswOnlinePartition {
641702
.take(std::cmp::min(5, entry_distances.len()))
642703
.map(|(id, _)| *id)
643704
.collect();
705+
644706
}
645707

646708
//println!("Starting search with vertex: {:?}", starting_vertices);
@@ -1967,6 +2029,10 @@ impl HnswOnlinePartition {
19672029
// Early termination tracking: stop when top-k results are stable
19682030
let mut stable_count = 0;
19692031
let mut last_best_distance = Distance::INFINITY;
2032+
// Track best candidates separately to avoid history eviction.
2033+
let best_candidates_capacity = ef;
2034+
let mut best_candidates: BinaryHeap<(OrderedFloat, Id)> =
2035+
BinaryHeap::with_capacity(best_candidates_capacity + 1);
19702036

19712037
// Main search loop with priority-ordered multi-vertex exploration
19722038
while !priority_frontier.is_empty() && iterations < max_iterations {
@@ -2004,6 +2070,11 @@ impl HnswOnlinePartition {
20042070
// Add to history
20052071
search.history.push((vertex_id, vertex_distance));
20062072

2073+
best_candidates.push((OrderedFloat(vertex_distance), vertex_id));
2074+
if best_candidates.len() > best_candidates_capacity {
2075+
best_candidates.pop();
2076+
}
2077+
20072078
// Get neighbors and add unvisited ones to priority frontier
20082079
if let Ok(neighbors) = self.get_neighbors_at_level(vertex_id, 0) {
20092080
for neighbor_id in neighbors {
@@ -2048,13 +2119,12 @@ impl HnswOnlinePartition {
20482119
}
20492120
}
20502121

2051-
// Search phase complete
2052-
2053-
// LOCAL EXPANSION PHASE: Explore neighbors to get k results
2054-
let mut all_candidates: Vec<(Id, Distance)> = search.history.to_vec();
2055-
2056-
// Sort initial candidates by distance to prioritize exploring closest vertices first
2057-
all_candidates.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
2122+
// LOCAL EXPANSION PHASE: Use best_candidates from heap (already sorted by distance)
2123+
let mut all_candidates: Vec<(Id, Distance)> = best_candidates
2124+
.into_sorted_vec()
2125+
.into_iter()
2126+
.map(|(dist, id)| (id, dist.0))
2127+
.collect();
20582128

20592129
// Use aggressive expansion when we have fewer candidates than needed
20602130
let needs_aggressive_expansion = all_candidates.len() < k || all_candidates.len() < ef;
@@ -2172,6 +2242,23 @@ impl HnswOnlinePartition {
21722242
None => std::cmp::Ordering::Equal,
21732243
});
21742244

2245+
for (vertex_id, distance) in final_candidates.iter().take(10) {
2246+
if let Some((cell_id, _)) = self.get_vertex(
2247+
vertex_id,
2248+
&self.chunks,
2249+
&*EMPTY_LOGGER,
2250+
&mut search.metadata,
2251+
dummy_job_id,
2252+
) {
2253+
info!(
2254+
"HNSW final candidate: vertex={:?}, cell={:?}, dist={}",
2255+
vertex_id,
2256+
cell_id,
2257+
distance
2258+
);
2259+
}
2260+
}
2261+
21752262
// Convert vertex IDs to cell IDs and take top-k
21762263
let mut results = Vec::new();
21772264
for (vertex_id, distance) in final_candidates.into_iter() {

src/apps/hnsw/recovery_tests.rs

Lines changed: 20 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ use neb::ram::schema::{Field, Schema as NebSchema};
2222

2323
use crate::apps::hnsw::coordinator::AsyncServiceClient;
2424
use crate::apps::hnsw::measurements::MetricEncoding;
25-
use crate::apps::hnsw::partition::schema::{initialize_schemas, TOP_LEVEL_VERTICES};
25+
use crate::apps::hnsw::partition::schema::{
26+
initialize_schemas, NEIGHBORS_FIELDS, TOP_LEVEL_VERTICES,
27+
};
2628
use crate::apps::hnsw::HNSWPartitionService;
2729
use crate::server::MorpheusServer;
2830
use crate::tests::start_server_with_storage_and_recovery;
@@ -1246,6 +1248,19 @@ mod tests {
12461248
let mut total_cells_before = 0usize;
12471249
let mut top_level_vertices_before: Vec<Id> = Vec::new();
12481250

1251+
let test_vectors: Vec<TestVector> = data_cell_ids
1252+
.iter()
1253+
.enumerate()
1254+
.map(|(i, cell_id)| {
1255+
let vector = vec![
1256+
i as f32 * 10.0,
1257+
(i as f32 + 1.0) * 10.0,
1258+
(i as f32 + 2.0) * 10.0,
1259+
];
1260+
TestVector::new(cell_id.higher, cell_id.lower, vector)
1261+
})
1262+
.collect();
1263+
12491264
// Phase 1: Create index and insert vectors
12501265
{
12511266
log::info!(
@@ -1254,20 +1269,6 @@ mod tests {
12541269
);
12551270
let (server, client) = setup_server_with_index(port, group_name).await.unwrap();
12561271

1257-
// Create test vectors
1258-
let test_vectors: Vec<TestVector> = data_cell_ids
1259-
.iter()
1260-
.enumerate()
1261-
.map(|(i, cell_id)| {
1262-
let vector = vec![
1263-
i as f32 * 10.0,
1264-
(i as f32 + 1.0) * 10.0,
1265-
(i as f32 + 2.0) * 10.0,
1266-
];
1267-
TestVector::new(cell_id.higher, cell_id.lower, vector)
1268-
})
1269-
.collect();
1270-
12711272
// Insert vectors
12721273
insert_vectors(&server, &client, &test_vectors)
12731274
.await
@@ -1310,6 +1311,7 @@ mod tests {
13101311
);
13111312
}
13121313

1314+
13131315
// Verify search works before shutdown
13141316
let query = vec![0.0, 10.0, 20.0];
13151317
let results = client
@@ -1366,6 +1368,8 @@ mod tests {
13661368

13671369
let neb = &server.neb_client;
13681370

1371+
let _partition_service = server.init_hnsw_index_partition_service().await.unwrap();
1372+
13691373
// Assert: All data cells recovered
13701374
let mut data_cells_recovered = 0;
13711375
for cell_id in &data_cell_ids {
@@ -1401,8 +1405,7 @@ mod tests {
14011405
verts.len()
14021406
);
14031407

1404-
// Verify each top-level vertex cell exists, points to valid data, and has edges
1405-
let mut total_edges_found = 0;
1408+
// Verify each top-level vertex cell exists and points to valid data
14061409
for v in verts {
14071410
let vertex_cell = neb
14081411
.read_cell(*v)
@@ -1421,38 +1424,7 @@ mod tests {
14211424
data_cell_id
14221425
);
14231426
}
1424-
1425-
// Verify vertex has edges (graph connectivity)
1426-
use crate::apps::hnsw::partition::schema::HNSW_VERTEX_SCHEMA_ID;
1427-
use crate::graph::fields::UNDIRECTED_KEY_ID;
1428-
use crate::graph::id_list::AtomicIdList;
1429-
1430-
let mut edge_list = AtomicIdList::new(
1431-
server.neb_client.clone(),
1432-
*v,
1433-
*UNDIRECTED_KEY_ID,
1434-
HNSW_VERTEX_SCHEMA_ID,
1435-
);
1436-
1437-
match edge_list.all().await {
1438-
Ok(edges) => {
1439-
total_edges_found += edges.len();
1440-
// Each top-level vertex should have at least some edges
1441-
// (in a well-connected HNSW graph)
1442-
}
1443-
Err(e) => {
1444-
// Edge list might not exist for some vertices (e.g., isolated at top level)
1445-
log::debug!("Vertex {:?} has no edges: {:?}", v, e);
1446-
}
1447-
}
14481427
}
1449-
1450-
// With 100 vectors, we expect a reasonable number of edges in the graph
1451-
// Each vector connects to several neighbors (M parameter typically 16-64)
1452-
log::info!(
1453-
"Total edges found from top-level vertices: {}",
1454-
total_edges_found
1455-
);
14561428
} else {
14571429
panic!("top_level_vertices should be an Id array");
14581430
}
@@ -1549,7 +1521,6 @@ mod tests {
15491521
unique_result_ids.len()
15501522
);
15511523

1552-
log::info!("Phase 2 complete: All assertions passed");
15531524
server.shutdown().await;
15541525
}
15551526

src/server/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,8 +341,11 @@ impl MorpheusServer {
341341
pub async fn shutdown(&self) {
342342
log::info!("MorpheusServer: starting graceful shutdown");
343343

344-
// TODO: Add explicit HNSW index saving here if needed
345-
// Currently, HNSW indices are saved after each insertion in the partition service
344+
// Ensure HNSW edge cache is flushed before storage/raft shutdown.
345+
if let Some(service) = self.services.hnsw_partition.get() {
346+
log::info!("MorpheusServer: flushing HNSW edge cache before shutdown");
347+
service.partition.flush_edge_cache();
348+
}
346349

347350
// Delegate to NebServer's shutdown method which handles:
348351
// 1. LSM tree flushing (if RangedIndexer is enabled)

0 commit comments

Comments
 (0)