Skip to content

Commit d8999dd

Browse files
committed
plumbling
1 parent 015b9b4 commit d8999dd

9 files changed

Lines changed: 155 additions & 32 deletions

File tree

src/apps/hnsw/coordinator.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ use futures::future::BoxFuture;
66
use neb::ram::cell::OwnedCell;
77

88
use crate::{
9-
graph::GraphEngine, server::MorpheusServer, traversal::navigation::{meta_cell_schema, META_CELLS_SCHEMA_ID}
9+
graph::GraphEngine,
10+
server::MorpheusServer,
11+
traversal::navigation::{meta_cell_schema, META_CELLS_SCHEMA_ID},
1012
};
1113

1214
use super::meta_index_id;

src/apps/hnsw/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@ use parking_lot::Mutex;
1818

1919
use crate::{
2020
graph::{edge::Edge, vertex::Vertex, EdgeDirection, GraphEngine},
21-
server::{schema::{MorpheusSchema, EMPTY_FIELDS}, MorpheusServer},
21+
server::{
22+
schema::{MorpheusSchema, EMPTY_FIELDS},
23+
MorpheusServer,
24+
},
2225
traversal::navigation::*,
2326
};
2427

src/apps/hnsw/partition/schema.rs

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use neb::ram::schema::{Field, IndexType, Schema as NebSchema};
88
use neb::ram::types::RandValue;
99
use rayon::vec;
1010

11+
use crate::graph::edge;
1112
use crate::graph::vertex::Vertex;
1213
use crate::server::{schema::*, MorpheusServer};
1314

@@ -74,21 +75,26 @@ pub fn index_cell(index: &HNSWIndex) -> OwnedCell {
7475
}
7576

7677
fn edge_schema(name: &str, level: usize) -> MorpheusSchema {
77-
MorpheusSchema::new(&edge_schema_name(name, level), None, &EMPTY_FIELDS, false)
78+
let edge_schema_name = edge_schema_name(name, level);
79+
let edge_schema_id = hash_str(&edge_schema_name) as u32;
80+
MorpheusSchema::new_edge_with_id(
81+
edge_schema_id,
82+
&edge_schema_name,
83+
None,
84+
&EMPTY_FIELDS,
85+
edge::EdgeAttributes::new(edge::EdgeType::Undirected, false),
86+
false,
87+
)
7888
}
7989

8090
pub const HNSW_VERTEX_SCHEMA_ID: u32 = hash_ident!("_hnsw_vertex");
8191

8292
pub fn hnsw_vertex_schema() -> MorpheusSchema {
83-
MorpheusSchema::new_with_id(
93+
MorpheusSchema::new_vertex_with_id(
8494
HNSW_VERTEX_SCHEMA_ID,
8595
"_hnsw_vertex",
8696
None,
87-
&vec![Field::new_indexed(
88-
CELL,
89-
Type::Id,
90-
vec![IndexType::Hashed],
91-
)],
97+
&vec![Field::new_indexed(CELL, Type::Id, vec![IndexType::Hashed])],
9298
false,
9399
)
94100
}
@@ -114,20 +120,37 @@ pub async fn initialize_schemas(morph: &Arc<MorpheusServer>) -> Result<(), Strin
114120
let schema_id = level_schema.id;
115121
if neb.schema_client.get(&schema_id).await.unwrap().is_none() {
116122
morph
117-
.schema_container
118-
.new_schema(level_schema.clone())
119-
.await
120-
.map_err(|e| format!("Failed to create level {} schema: {:?}", level_schema.name, e))?;
123+
.schema_container
124+
.new_schema(level_schema.clone())
125+
.await
126+
.map_err(|e| {
127+
format!(
128+
"Failed to create level {} schema: {:?}",
129+
level_schema.name, e
130+
)
131+
})?;
121132
}
122133
}
123-
if neb.schema_client.get(&HNSW_VERTEX_SCHEMA_ID).await.unwrap().is_none() {
134+
if neb
135+
.schema_client
136+
.get(&HNSW_VERTEX_SCHEMA_ID)
137+
.await
138+
.unwrap()
139+
.is_none()
140+
{
124141
morph
125142
.schema_container
126143
.new_schema(hnsw_vertex_schema())
127144
.await
128145
.map_err(|e| format!("Failed to create HNSW vertex schema: {:?}", e))?;
129146
}
130-
if neb.schema_client.get(&INDEX_SCHEMA_ID).await.unwrap().is_none() {
147+
if neb
148+
.schema_client
149+
.get(&INDEX_SCHEMA_ID)
150+
.await
151+
.unwrap()
152+
.is_none()
153+
{
131154
neb.new_schema_with_id(index_schema())
132155
.await
133156
.map_err(|e| format!("Failed to create index schema: {:?}", e))?

src/apps/hnsw/partition/search.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,9 @@ impl HnswOnlinePartition {
458458
loop {
459459
let frontier = &search.frontier;
460460
let query = &search.query;
461+
if frontier.is_empty() {
462+
break;
463+
}
461464
let (selected_vertex, new_distance) = match self.closest_vertex(
462465
frontier,
463466
query,
@@ -754,7 +757,15 @@ impl HnswOnlinePartition {
754757
let mut vertex_cell = format_vertex_cell(vertex.cell).unwrap();
755758
let header = chunks // Take a short by adding the cell right to the chunk
756759
.write_cell(&mut vertex_cell)
757-
.map_err(|e| HNSWIndexError::WriteError(e))?;
760+
.map_err(|e| {
761+
append_job_log(
762+
logger,
763+
job_id,
764+
JobLogLevel::Error,
765+
format!("Failed to write vertex cell: {:?}", e),
766+
);
767+
HNSWIndexError::WriteError(e)
768+
})?;
758769
debug_assert_eq!(header.id(), vertex_id); // Just be safe
759770

760771
// First, determine the level for this new vertex

src/apps/hnsw/partition/tests.rs

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ impl TestEnvironment {
144144
.await
145145
.map_err(|e| format!("Failed to create cell schema: {:?}", e))?
146146
.map_err(|e| format!("Failed to create cell schema: {:?}", e))?;
147-
initialize_schemas(morph).await;
147+
initialize_schemas(morph).await.unwrap();
148148
Ok(self)
149149
}
150150

@@ -167,7 +167,7 @@ impl TestEnvironment {
167167
Ok(self)
168168
}
169169

170-
pub async fn create_index(&mut self) -> Result<(), String> {
170+
pub async fn create_index(&self) -> Result<(), String> {
171171
if self.morpheus.is_none() {
172172
return Err("Morpheus not initialized".to_string());
173173
}
@@ -188,7 +188,7 @@ impl TestEnvironment {
188188
}
189189

190190
/// Create the HNSW graph from cells
191-
pub async fn index_cells(&mut self) -> Result<(), String> {
191+
pub async fn index_cells(&self) -> Result<(), String> {
192192
if self.morpheus.is_none() {
193193
return Err("Morpheus not initialized".to_string());
194194
}
@@ -212,8 +212,14 @@ impl TestEnvironment {
212212
)
213213
.await
214214
.map_err(|e| format!("Failed to create job: {:?}", e))?;
215-
partition_svr.next_iteration(job_id).await.map_err(|e| format!("Failed to run job with next iteration: {:?}", e))?;
216-
partition_svr.index_cell(job_id, cell_id).await.map_err(|e| format!("Failed to index cell: {:?}", e))?;
215+
partition_svr
216+
.next_iteration(job_id)
217+
.await
218+
.map_err(|e| format!("Failed to run job with next iteration: {:?}", e))?;
219+
partition_svr
220+
.index_cell(job_id, cell_id)
221+
.await
222+
.map_err(|e| format!("Failed to index cell: {:?}", e))?;
217223
// let mut search = partition.new_search(CELL_SCHEMA_ID, VECTOR_FIELD_ID, &vector, 1, MetricEncoding::L2)
218224
// .await
219225
// .map_err(|e| format!("Failed to create search: {:?}", e))?;
@@ -351,26 +357,66 @@ mod tests {
351357

352358
// Test that the data cell contains the vector
353359
if let OwnedValue::Map(map) = &data_cell.data {
354-
assert_eq!(map.get_by_key_id(field_id), &OwnedValue::PrimArray(test_vector.vector.clone()));
360+
assert_eq!(
361+
map.get_by_key_id(field_id),
362+
&OwnedValue::PrimArray(test_vector.vector.clone())
363+
);
355364
} else {
356365
panic!("Data cell data is not a map");
357366
}
358367
}
359368

360369
#[tokio::test]
361370
async fn server_startup() {
362-
TestEnvironment::new(
363-
5000, 1, VECTOR_FIELD_ID, CELL_SCHEMA_ID)
371+
let env = TestEnvironment::new(5000, 1, VECTOR_FIELD_ID, CELL_SCHEMA_ID)
364372
.with_test_vectors(vec![vec![1.0, 2.0, 3.0]])
365373
.with_job_logger()
366374
.with_job_id(JobId::new(1, 1))
367-
.initialize_server().await
368-
.initialize_schemas().await.unwrap()
369-
.initialize_cells().await.unwrap()
370-
.create_index().await.unwrap();
375+
.initialize_server()
376+
.await
377+
.initialize_schemas()
378+
.await
379+
.unwrap()
380+
.initialize_cells()
381+
.await
382+
.unwrap();
383+
env.create_index().await.unwrap();
371384
}
372385
}
386+
#[tokio::test]
387+
async fn server_startup() {
388+
let env = TestEnvironment::new(5000, 1, VECTOR_FIELD_ID, CELL_SCHEMA_ID)
389+
.with_test_vectors(vec![vec![1.0, 2.0, 3.0]])
390+
.with_job_logger()
391+
.with_job_id(JobId::new(1, 1))
392+
.initialize_server()
393+
.await
394+
.initialize_schemas()
395+
.await
396+
.unwrap()
397+
.initialize_cells()
398+
.await
399+
.unwrap();
400+
env.create_index().await.unwrap();
401+
}
373402

403+
#[tokio::test]
404+
async fn index_one_cell() {
405+
let env = TestEnvironment::new(5001, 1, VECTOR_FIELD_ID, CELL_SCHEMA_ID)
406+
.with_test_vectors(vec![vec![1.0, 2.0, 3.0]])
407+
.with_job_logger()
408+
.with_job_id(JobId::new(1, 1))
409+
.initialize_server()
410+
.await
411+
.initialize_schemas()
412+
.await
413+
.unwrap()
414+
.initialize_cells()
415+
.await
416+
.unwrap();
417+
env.create_index().await.unwrap();
418+
env.index_cells().await.unwrap();
419+
}
374420
// Example of how to set up a functional test (commented out since it needs user implementation)
375421
/*
376422
#[async_std::test]

src/job/logger.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ pub fn append_job_log(
2929
level: JobLogLevel,
3030
message: String,
3131
) {
32+
if cfg!(debug_assertions) {
33+
println!("[{:?} - {:?}] {}", job_id, level, message);
34+
}
3235
match job_logger.get(&job_id) {
3336
Some(job_logger) => {
3437
job_logger.lock().push(JobLog {

src/job/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ pub fn initialize_job_run(report: &Arc<Mutex<JobReport>>) {
6565

6666
impl JobId {
6767
pub fn new(coordinator_server_id: u64, coordinator_job_id: u64) -> Self {
68-
Self { coordinator_server_id, coordinator_job_id }
68+
Self {
69+
coordinator_server_id,
70+
coordinator_job_id,
71+
}
6972
}
7073
}

src/server/schema/mod.rs

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,23 +55,55 @@ lazy_static! {
5555
}
5656

5757
impl MorpheusSchema {
58-
pub fn new_with_id<'a>(
58+
pub fn new_with_id_and_type<'a>(
5959
id: u32,
6060
name: &'a str,
6161
key_field: Option<&Vec<String>>,
6262
fields: &Vec<Field>,
6363
is_dynamic: bool,
64+
schema_type: GraphSchema,
6465
) -> MorpheusSchema {
6566
MorpheusSchema {
6667
id,
6768
name: name.to_string(),
6869
key_field: key_field.cloned(),
6970
fields: fields.clone(),
70-
schema_type: GraphSchema::Unspecified,
71+
schema_type,
7172
is_dynamic,
7273
}
7374
}
7475

76+
pub fn new_with_id<'a>(
77+
id: u32,
78+
name: &'a str,
79+
key_field: Option<&Vec<String>>,
80+
fields: &Vec<Field>,
81+
is_dynamic: bool,
82+
) -> MorpheusSchema {
83+
MorpheusSchema::new_with_id_and_type(id, name, key_field, fields, is_dynamic, GraphSchema::Unspecified)
84+
}
85+
86+
pub fn new_vertex_with_id<'a>(
87+
id: u32,
88+
name: &'a str,
89+
key_field: Option<&Vec<String>>,
90+
fields: &Vec<Field>,
91+
is_dynamic: bool,
92+
) -> MorpheusSchema {
93+
MorpheusSchema::new_with_id_and_type(id, name, key_field, fields, is_dynamic, GraphSchema::Vertex)
94+
}
95+
96+
pub fn new_edge_with_id<'a>(
97+
id: u32,
98+
name: &'a str,
99+
key_field: Option<&Vec<String>>,
100+
fields: &Vec<Field>,
101+
edge_attr: EdgeAttributes,
102+
is_dynamic: bool,
103+
) -> MorpheusSchema {
104+
MorpheusSchema::new_with_id_and_type(id, name, key_field, fields, is_dynamic, GraphSchema::Edge(edge_attr))
105+
}
106+
75107
pub fn new<'a>(
76108
name: &'a str,
77109
key_field: Option<&Vec<String>>,

src/traversal/navigation/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,4 +192,4 @@ impl NavigationService {
192192
}
193193
}
194194

195-
dispatch_rpc_service_functions!(NavigationService);
195+
dispatch_rpc_service_functions!(NavigationService);

0 commit comments

Comments
 (0)