Skip to content

Commit dba74cb

Browse files
committed
probing
1 parent 93dd436 commit dba74cb

5 files changed

Lines changed: 113 additions & 45 deletions

File tree

src/apps/hnsw/partition/search.rs

Lines changed: 71 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use std::{
22
cell::RefCell,
33
collections::{BinaryHeap, HashSet, VecDeque},
4-
sync::Arc, vec,
4+
sync::Arc,
5+
vec,
56
};
67

78
use ahash::HashMap;
@@ -27,7 +28,10 @@ use parking_lot::RwLock;
2728

2829
use crate::{
2930
apps::hnsw::measurements::{value_distance, HnswMetric, Metric, MetricEncoding},
30-
graph::{format_vertex_cell, id_list::IdList, partitioner::DefaultPartitioner, vertex::Vertex, EdgeDirection, GraphEngine, NeighbourhoodError},
31+
graph::{
32+
format_vertex_cell, id_list::IdList, partitioner::DefaultPartitioner, vertex::Vertex,
33+
EdgeDirection, GraphEngine, NeighbourhoodError,
34+
},
3135
job::{
3236
logger::{append_job_log, JobLogLevel, JobLogger},
3337
JobId,
@@ -380,36 +384,46 @@ impl HnswOnlinePartition {
380384
let schema_id = schema.id;
381385
let ed = EdgeDirection::Undirected;
382386
for id in selected_vertex_ids {
383-
let neighbours = match engine
384-
.node_local_neighbour_id_and_edge(id, schema_id, ed) {
385-
Ok(neighbours) => neighbours,
386-
Err(NeighbourhoodError::IdListError(IdListError::ContainerCellNotReady)) if !readonly => vec![],
387-
Err(NeighbourhoodError::IdListError(IdListError::ContainerCellNotReady)) => {
388-
// If the container cell is not ready, we need to prepare the it
389-
let field_id = ed.as_field();
390-
let _ = engine.graph_transaction(DefaultPartitioner, move |txn| {
391-
let mut list = IdList::from_txn_and_container(&txn.neb_txn, *id, field_id, schema_id);
387+
let neighbours = match engine.node_local_neighbour_id_and_edge(id, schema_id, ed) {
388+
Ok(neighbours) => neighbours,
389+
Err(NeighbourhoodError::IdListError(IdListError::ContainerCellNotReady))
390+
if !readonly =>
391+
{
392+
vec![]
393+
}
394+
Err(NeighbourhoodError::IdListError(IdListError::ContainerCellNotReady)) => {
395+
// If the container cell is not ready, we need to prepare the it
396+
let field_id = ed.as_field();
397+
let _ = engine
398+
.graph_transaction(DefaultPartitioner, move |txn| {
399+
let mut list = IdList::from_txn_and_container(
400+
&txn.neb_txn,
401+
*id,
402+
field_id,
403+
schema_id,
404+
);
392405
async move {
393406
list.ensure_list().await;
394407
Ok(())
395408
}
396-
}).await;
397-
vec![]
398-
}
399-
Err(e) => {
400-
append_job_log(
401-
logger,
402-
job_id,
403-
JobLogLevel::Error,
404-
format!(
405-
"Neighbourhood error to get neighbours for vertex {:?}: {:?}",
406-
id, e
407-
),
408-
);
409-
return Err(HNSWIndexError::NeighbourhoodError(e));
410-
}
409+
})
410+
.await;
411+
vec![]
412+
}
413+
Err(e) => {
414+
append_job_log(
415+
logger,
416+
job_id,
417+
JobLogLevel::Error,
418+
format!(
419+
"Neighbourhood error to get neighbours for vertex {:?}: {:?}",
420+
id, e
421+
),
422+
);
423+
return Err(HNSWIndexError::NeighbourhoodError(e));
424+
}
411425
};
412-
let new_frontiers = neighbours
426+
let new_frontiers = neighbours
413427
.into_iter()
414428
.map(|(id, _)| id) // Only take the opposite id
415429
.collect::<Vec<_>>();
@@ -630,10 +644,11 @@ impl HnswOnlinePartition {
630644

631645
visited.insert(current_vertex);
632646
// Get all neighbors of the current vertex
633-
match self
634-
.engine
635-
.node_local_neighbour_id_and_edge(&current_vertex, level_schema, EdgeDirection::Undirected)
636-
{
647+
match self.engine.node_local_neighbour_id_and_edge(
648+
&current_vertex,
649+
level_schema,
650+
EdgeDirection::Undirected,
651+
) {
637652
Ok(neighbors) => {
638653
for (neighbor_id, _) in neighbors {
639654
let is_local = self.conshash.get_server_id(neighbor_id.higher)
@@ -649,15 +664,21 @@ impl HnswOnlinePartition {
649664
logger,
650665
job_id,
651666
JobLogLevel::Info,
652-
format!("Container cell not ready for vertex {:?}, skipping", current_vertex),
667+
format!(
668+
"Container cell not ready for vertex {:?}, skipping",
669+
current_vertex
670+
),
653671
);
654-
},
672+
}
655673
Err(e) => {
656674
append_job_log(
657675
logger,
658676
job_id,
659677
JobLogLevel::Error,
660-
format!("Error getting neighbours for vertex {:?}: {:?}", current_vertex, e),
678+
format!(
679+
"Error getting neighbours for vertex {:?}: {:?}",
680+
current_vertex, e
681+
),
661682
);
662683
}
663684
}
@@ -885,6 +906,17 @@ impl HnswOnlinePartition {
885906
);
886907

887908
// Connect the new vertex to its selected neighbors
909+
append_job_log(
910+
logger,
911+
job_id,
912+
JobLogLevel::Info,
913+
format!(
914+
"Going to connect {} neighbors for vertex {:?} at level {}",
915+
selected_neighbors.len(),
916+
vertex_id,
917+
level
918+
),
919+
);
888920
for neighbor_id in &selected_neighbors {
889921
// Create bidirectional edges
890922
match engine
@@ -1119,10 +1151,11 @@ impl HnswOnlinePartition {
11191151
}
11201152

11211153
// Get neighbors of current vertex
1122-
match self
1123-
.engine
1124-
.node_local_neighbour_id_and_edge(&current_vertex, level_schema, EdgeDirection::Undirected)
1125-
{
1154+
match self.engine.node_local_neighbour_id_and_edge(
1155+
&current_vertex,
1156+
level_schema,
1157+
EdgeDirection::Undirected,
1158+
) {
11261159
Ok(neighbors) => {
11271160
for (neighbor_id, _) in neighbors {
11281161
// Only process vertices from this partition

src/apps/hnsw/partition/service.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,11 @@ impl service::Service for service::HnswPartitionService {
143143
.boxed()
144144
}
145145

146-
fn next_iteration<'a>(&'a self, readonly: bool, job_id: JobId) -> BoxFuture<'a, Result<HashSet<Id>, String>> {
146+
fn next_iteration<'a>(
147+
&'a self,
148+
readonly: bool,
149+
job_id: JobId,
150+
) -> BoxFuture<'a, Result<HashSet<Id>, String>> {
147151
async move {
148152
let job = match self
149153
.jobs

src/apps/hnsw/partition/tests.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,16 @@ impl TestEnvironment {
203203

204204
/// Create the HNSW graph from cells
205205
pub async fn index_cells(&self) -> Result<(), String> {
206+
println!(
207+
"Indexing cells, schemas {:?}",
208+
self.morpheus
209+
.as_ref()
210+
.unwrap()
211+
.schema_container
212+
.all_morpheus_schemas()
213+
.await
214+
.unwrap()
215+
);
206216
if self.morpheus.is_none() {
207217
return Err("Morpheus not initialized".to_string());
208218
}
@@ -224,7 +234,7 @@ impl TestEnvironment {
224234
.await
225235
.map_err(|e| format!("Failed to create job: {:?}", e))?;
226236
partition_svr
227-
.next_iteration(false,job_id)
237+
.next_iteration(false, job_id)
228238
.await
229239
.map_err(|e| format!("Failed to run job with next iteration: {:?}", e))?;
230240
partition_svr

src/graph/id_list.rs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pub enum IdListError {
2121
ContainerCellNotReady,
2222
FormatError,
2323
Unexpected,
24+
CannotFindSchema,
2425
}
2526

2627
pub static ID_LIST_SCHEMA_ID: u32 = 100;
@@ -913,7 +914,6 @@ impl<'a> LocalIdListIterator<'a> {
913914
}
914915
None
915916
}
916-
917917
}
918918

919919
impl<'a> Iterator for LocalIdListIterator<'a> {
@@ -976,7 +976,7 @@ impl<'a> Iterator for LocalIdListSegmentIdIterator<'a> {
976976
error!("Expecting Id but got: {:?}", fields.data[0usize]);
977977
}
978978
} else {
979-
error!("Cannot find next key with next id {:?}", self.next);
979+
error!("Cannot find next key with next id {:?}", self.next);
980980
}
981981
}
982982
None
@@ -1060,7 +1060,13 @@ impl<'a> LocalIdList<'a> {
10601060
return Ok(Err(IdListError::ContainerCellNotReady));
10611061
}
10621062
}
1063-
_ => Ok(Err(IdListError::FormatError)),
1063+
_ => {
1064+
error!(
1065+
"Cannot find type list id for container id {:?}",
1066+
self.container_id
1067+
);
1068+
Ok(Err(IdListError::FormatError))
1069+
}
10641070
}
10651071
}
10661072

@@ -1093,6 +1099,10 @@ impl<'a> LocalIdList<'a> {
10931099
if let SharedValue::Id(list_id) = id_list_pair[*ID_TYPES_LIST_ID] {
10941100
Some(*list_id)
10951101
} else {
1102+
error!(
1103+
"Cannot find list id for schema {} in type list id {:?}",
1104+
self.schema_id, id_list_pair
1105+
);
10961106
None
10971107
}
10981108
})
@@ -1109,9 +1119,20 @@ impl<'a> LocalIdList<'a> {
11091119
if let SharedValue::Array(ref type_list) = type_list_cell.data[0u64] {
11101120
if let Some(list_id) = self.find_schema_list_id(type_list) {
11111121
return Ok(Ok(list_id));
1122+
} else {
1123+
error!(
1124+
"Cannot find schema {}, for type list {:?}",
1125+
self.schema_id, type_list
1126+
);
1127+
Ok(Err(IdListError::CannotFindSchema))
11121128
}
1129+
} else {
1130+
error!(
1131+
"Schema list format error for type list id {:?}",
1132+
type_list_id
1133+
);
1134+
Ok(Err(IdListError::FormatError))
11131135
}
1114-
Ok(Err(IdListError::FormatError))
11151136
}
11161137

11171138
/// Returns an iterator over all IDs in the list

src/server/schema/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ pub struct SchemaContainer {
4040
neb_mata: Arc<NebServerMeta>,
4141
}
4242

43-
#[derive(Clone)]
43+
#[derive(Clone, Debug)]
4444
pub struct MorpheusSchema {
4545
pub id: u32,
4646
pub name: String,

0 commit comments

Comments
 (0)