Skip to content

Commit 664cba8

Browse files
committed
implement the glue
1 parent 6d53906 commit 664cba8

6 files changed

Lines changed: 461 additions & 227 deletions

File tree

src/apps/hnsw/coordinator/mod.rs

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,23 @@ use std::{
55

66
use ahash::HashSet;
77
use bifrost::{
8-
conshash::ConsistentHashing, dispatch_rpc_service_functions, membership::server, raft::client::RaftClient, rpc::cluster::{
8+
conshash::ConsistentHashing,
9+
dispatch_rpc_service_functions,
10+
membership::server,
11+
raft::client::RaftClient,
12+
rpc::cluster::{
913
all_server_ids, broadcast_to_members, broadcast_with_server_ids, client_by_server_id,
10-
}, service
14+
},
15+
service,
1116
};
1217
use dovahkiin::{data_map, types::*};
1318
use futures::future::BoxFuture;
1419
use itertools::Itertools;
15-
use neb::{index::hash::get_hash_id_from_value, query::data_client::IndexedDataClient, ram::{cell::OwnedCell, types::RandValue}};
20+
use neb::{
21+
index::hash::get_hash_id_from_value,
22+
query::data_client::IndexedDataClient,
23+
ram::{cell::OwnedCell, types::RandValue},
24+
};
1625

1726
use crate::{
1827
graph::{partitioner::DefaultPartitioner, GraphEngine},
@@ -24,7 +33,10 @@ use crate::{
2433
use super::{
2534
measurements::MetricEncoding,
2635
partition::{
27-
index::HNSWIndex, schema::{CELL_FIELD_ID, HNSW_VERTEX_SCHEMA_ID}, service::service::AsyncServiceClient as PartitionSvrClient, types::OrderedFloat
36+
index::HNSWIndex,
37+
schema::{CELL_FIELD_ID, HNSW_VERTEX_SCHEMA_ID},
38+
service::service::AsyncServiceClient as PartitionSvrClient,
39+
types::OrderedFloat,
2840
},
2941
};
3042
use bifrost::service_with_id;
@@ -240,10 +252,7 @@ impl Service for HNSWIndexService {
240252
todo!()
241253
}
242254

243-
fn del_cell<'a>(
244-
&'a self,
245-
cell_id: Id,
246-
) -> BoxFuture<'a, Result<(), String>> {
255+
fn del_cell<'a>(&'a self, cell_id: Id) -> BoxFuture<'a, Result<(), String>> {
247256
async move {
248257
// Use the hash index to get the vertex id by the cell id
249258
let cell_id_value = OwnedValue::Id(cell_id);
@@ -253,20 +262,21 @@ impl Service for HNSWIndexService {
253262
.await
254263
.map_err(|e| format!("Failed to get index client: {:?}", e))?
255264
.map_err(|e| format!("Failed to get index client: {:?}", e))?;
256-
self.engine.graph_transaction(DefaultPartitioner, |txn| {
257-
let index_vertex = index_vertex.clone();
258-
async move {
259-
for vid in &index_vertex {
260-
if let Err(e) = txn.remove_vertex(vid).await? {
261-
return Ok(Err(e));
265+
self.engine
266+
.graph_transaction(DefaultPartitioner, |txn| {
267+
let index_vertex = index_vertex.clone();
268+
async move {
269+
for vid in &index_vertex {
270+
if let Err(e) = txn.remove_vertex(vid).await? {
271+
return Ok(Err(e));
272+
}
262273
}
274+
Ok(Ok(()))
263275
}
264-
Ok(Ok(()))
265-
}
266-
})
267-
.await
268-
.map_err(|e| format!("Failed to delete vertex: {:?}", e))?
269-
.map_err(|e| format!("Failed to delete vertex: {:?}", e))?;
276+
})
277+
.await
278+
.map_err(|e| format!("Failed to delete vertex: {:?}", e))?
279+
.map_err(|e| format!("Failed to delete vertex: {:?}", e))?;
270280
Ok(())
271281
}
272282
.boxed()
@@ -282,7 +292,8 @@ impl Service for HNSWIndexService {
282292
) -> BoxFuture<'a, Result<(), String>> {
283293
async move {
284294
let _ = self.del_cell(cell_id).await;
285-
self.new_cell(schema, field_id, cell_id, ef_construction, metric).await
295+
self.new_cell(schema, field_id, cell_id, ef_construction, metric)
296+
.await
286297
}
287298
.boxed()
288299
}
@@ -298,7 +309,11 @@ impl HNSWIndexService {
298309
engine: engine.clone(),
299310
conshash: conshash.clone(),
300311
job_id_counter: AtomicU64::new(1),
301-
indexer: Arc::new(IndexedDataClient::new(engine.neb_client(), conshash, raft_client)),
312+
indexer: Arc::new(IndexedDataClient::new(
313+
engine.neb_client(),
314+
conshash,
315+
raft_client,
316+
)),
302317
})
303318
}
304319
fn server_id(&self) -> u64 {

src/apps/hnsw/coordinator/tests.rs

Lines changed: 103 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,13 @@ pub struct TestEnvironment {
6666

6767
impl TestEnvironment {
6868
/// Create a new test environment with test vectors
69-
pub fn new(server_port: u32, group_name: &str, partition_id: u64, field_id: u64, schema_id: u32) -> Self {
69+
pub fn new(
70+
server_port: u32,
71+
group_name: &str,
72+
partition_id: u64,
73+
field_id: u64,
74+
schema_id: u32,
75+
) -> Self {
7076
let _ = env_logger::try_init();
7177
Self {
7278
job_counter: RefCell::new(1),
@@ -109,7 +115,9 @@ impl TestEnvironment {
109115
}
110116

111117
pub async fn initialize_server(mut self) -> Self {
112-
let server = start_server(self.server_port, &self.group_name).await.unwrap();
118+
let server = start_server(self.server_port, &self.group_name)
119+
.await
120+
.unwrap();
113121
let server_id = server.neb_server.server_id;
114122
self.morpheus = Some(server.clone());
115123
self.conshash = Some(server.neb_server.consh.clone());
@@ -244,46 +252,64 @@ mod tests {
244252

245253
#[tokio::test]
246254
async fn test_create_index() {
247-
let env = TestEnvironment::new(5000, "test_create_index", 1, VECTOR_FIELD_ID, CELL_SCHEMA_ID)
248-
.with_job_logger()
249-
.initialize_server()
250-
.await
251-
.initialize_schemas()
252-
.await
253-
.unwrap();
255+
let env = TestEnvironment::new(
256+
5000,
257+
"test_create_index",
258+
1,
259+
VECTOR_FIELD_ID,
260+
CELL_SCHEMA_ID,
261+
)
262+
.with_job_logger()
263+
.initialize_server()
264+
.await
265+
.initialize_schemas()
266+
.await
267+
.unwrap();
254268
env.create_index().await.unwrap();
255269
}
256270

257271
#[tokio::test]
258272
async fn test_index_one_cell() {
259-
let env = TestEnvironment::new(5001, "test_index_one_cell", 1, VECTOR_FIELD_ID, CELL_SCHEMA_ID)
260-
.with_test_vectors(vec![vec![1.0, 2.0, 3.0]])
261-
.with_job_logger()
262-
.initialize_server()
263-
.await
264-
.initialize_schemas()
265-
.await
266-
.unwrap()
267-
.initialize_cells()
268-
.await
269-
.unwrap();
273+
let env = TestEnvironment::new(
274+
5001,
275+
"test_index_one_cell",
276+
1,
277+
VECTOR_FIELD_ID,
278+
CELL_SCHEMA_ID,
279+
)
280+
.with_test_vectors(vec![vec![1.0, 2.0, 3.0]])
281+
.with_job_logger()
282+
.initialize_server()
283+
.await
284+
.initialize_schemas()
285+
.await
286+
.unwrap()
287+
.initialize_cells()
288+
.await
289+
.unwrap();
270290
env.create_index().await.unwrap();
271291
env.index_cells().await.unwrap();
272292
}
273293

274294
#[tokio::test]
275295
async fn test_index_and_query_one_cell() {
276-
let env = TestEnvironment::new(5002, "test_index_and_query_one_cell", 1, VECTOR_FIELD_ID, CELL_SCHEMA_ID)
277-
.with_test_vectors(vec![vec![1.0, 2.0, 3.0]])
278-
.with_job_logger()
279-
.initialize_server()
280-
.await
281-
.initialize_schemas()
282-
.await
283-
.unwrap()
284-
.initialize_cells()
285-
.await
286-
.unwrap();
296+
let env = TestEnvironment::new(
297+
5002,
298+
"test_index_and_query_one_cell",
299+
1,
300+
VECTOR_FIELD_ID,
301+
CELL_SCHEMA_ID,
302+
)
303+
.with_test_vectors(vec![vec![1.0, 2.0, 3.0]])
304+
.with_job_logger()
305+
.initialize_server()
306+
.await
307+
.initialize_schemas()
308+
.await
309+
.unwrap()
310+
.initialize_cells()
311+
.await
312+
.unwrap();
287313
env.create_index().await.unwrap();
288314
env.index_cells().await.unwrap();
289315

@@ -296,21 +322,27 @@ mod tests {
296322

297323
#[tokio::test]
298324
async fn test_index_and_query_multiple_cells() {
299-
let env = TestEnvironment::new(5003, "test_index_and_query_multiple_cells", 1, VECTOR_FIELD_ID, CELL_SCHEMA_ID)
300-
.with_test_vectors(vec![
301-
vec![1.0, 2.0, 3.0],
302-
vec![4.0, 5.0, 6.0],
303-
vec![7.0, 8.0, 9.0],
304-
])
305-
.with_job_logger()
306-
.initialize_server()
307-
.await
308-
.initialize_schemas()
309-
.await
310-
.unwrap()
311-
.initialize_cells()
312-
.await
313-
.unwrap();
325+
let env = TestEnvironment::new(
326+
5003,
327+
"test_index_and_query_multiple_cells",
328+
1,
329+
VECTOR_FIELD_ID,
330+
CELL_SCHEMA_ID,
331+
)
332+
.with_test_vectors(vec![
333+
vec![1.0, 2.0, 3.0],
334+
vec![4.0, 5.0, 6.0],
335+
vec![7.0, 8.0, 9.0],
336+
])
337+
.with_job_logger()
338+
.initialize_server()
339+
.await
340+
.initialize_schemas()
341+
.await
342+
.unwrap()
343+
.initialize_cells()
344+
.await
345+
.unwrap();
314346
env.create_index().await.unwrap();
315347
env.index_cells().await.unwrap();
316348

@@ -330,17 +362,23 @@ mod tests {
330362
test_vectors.push(vec![i as f32, (i + 1) as f32, (i + 2) as f32]);
331363
}
332364

333-
let env = TestEnvironment::new(5004, "test_index_and_query_many_cells", 1, VECTOR_FIELD_ID, CELL_SCHEMA_ID)
334-
.with_test_vectors(test_vectors)
335-
.with_job_logger()
336-
.initialize_server()
337-
.await
338-
.initialize_schemas()
339-
.await
340-
.unwrap()
341-
.initialize_cells()
342-
.await
343-
.unwrap();
365+
let env = TestEnvironment::new(
366+
5004,
367+
"test_index_and_query_many_cells",
368+
1,
369+
VECTOR_FIELD_ID,
370+
CELL_SCHEMA_ID,
371+
)
372+
.with_test_vectors(test_vectors)
373+
.with_job_logger()
374+
.initialize_server()
375+
.await
376+
.initialize_schemas()
377+
.await
378+
.unwrap()
379+
.initialize_cells()
380+
.await
381+
.unwrap();
344382
env.create_index().await.unwrap();
345383
env.index_cells().await.unwrap();
346384

@@ -355,13 +393,14 @@ mod tests {
355393

356394
#[tokio::test]
357395
async fn test_error_cases() {
358-
let env = TestEnvironment::new(5005, "test_error_cases", 1, VECTOR_FIELD_ID, CELL_SCHEMA_ID)
359-
.with_job_logger()
360-
.initialize_server()
361-
.await
362-
.initialize_schemas()
363-
.await
364-
.unwrap();
396+
let env =
397+
TestEnvironment::new(5005, "test_error_cases", 1, VECTOR_FIELD_ID, CELL_SCHEMA_ID)
398+
.with_job_logger()
399+
.initialize_server()
400+
.await
401+
.initialize_schemas()
402+
.await
403+
.unwrap();
365404

366405
// Test querying before creating index
367406
let query = vec![1.0, 2.0, 3.0];

0 commit comments

Comments
 (0)