Skip to content

Commit 529df6b

Browse files
committed
accelerate by not reading vertex when reading bodyless edge
1 parent bbfa7ab commit 529df6b

6 files changed

Lines changed: 391 additions & 76 deletions

File tree

src/apps/hnsw/partition/search.rs

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,7 @@ impl HnswOnlinePartition {
327327
let selected_vertex = frontier
328328
.iter()
329329
.filter_map(|vertex_id| {
330+
// TODO: might need to hold the cell_id for the selected vertex
330331
let (_cell_id, distance) = self.get_vertex_distance(
331332
vertex_id, query, field_id, metric, metadata, chunks, logger, job_id,
332333
)?;
@@ -337,9 +338,12 @@ impl HnswOnlinePartition {
337338
.min_by(|(_, d1), (_, d2)| d1.partial_cmp(d2).unwrap());
338339
let selected_vertex = match selected_vertex {
339340
Some(vertex) => {
340-
info!("Selected vertex at level {} is {:?}", metadata.level, vertex);
341+
info!(
342+
"Selected vertex at level {} is {:?}",
343+
metadata.level, vertex
344+
);
341345
vertex
342-
},
346+
}
343347
None => {
344348
append_job_log(
345349
logger,
@@ -887,15 +891,15 @@ impl HnswOnlinePartition {
887891
.await
888892
{
889893
Ok(Ok(_)) => {
890-
append_job_log(
891-
logger,
892-
job_id,
893-
JobLogLevel::Info,
894-
format!(
895-
"Connected vertex {:?} to neighbor {:?} at level {}",
896-
vertex_id, neighbor_id, level
897-
),
898-
);
894+
// append_job_log(
895+
// logger,
896+
// job_id,
897+
// JobLogLevel::Info,
898+
// format!(
899+
// "Connected vertex {:?} to neighbor {:?} at level {}",
900+
// vertex_id, neighbor_id, level
901+
// ),
902+
// );
899903
}
900904
Ok(Err(e)) => {
901905
append_job_log(
@@ -1018,15 +1022,15 @@ impl HnswOnlinePartition {
10181022
let (best_id, _, _) = candidates_to_consider[best_candidate_idx];
10191023
selected.push(best_id);
10201024

1021-
append_job_log(
1022-
logger,
1023-
job_id,
1024-
JobLogLevel::Info,
1025-
format!(
1026-
"Added diverse neighbor {:?} to selection (distance: {})",
1027-
best_id, best_min_distance
1028-
),
1029-
);
1025+
// append_job_log(
1026+
// logger,
1027+
// job_id,
1028+
// JobLogLevel::Info,
1029+
// format!(
1030+
// "Added diverse neighbor {:?} to selection (distance: {})",
1031+
// best_id, best_min_distance
1032+
// ),
1033+
// );
10301034

10311035
// Remove this candidate from consideration
10321036
candidates_to_consider.remove(best_candidate_idx);

src/apps/hnsw/partition/tests.rs

Lines changed: 124 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ pub struct TestEnvironment {
7575
impl TestEnvironment {
7676
/// Create a new test environment with test vectors
7777
pub fn new(server_port: u32, partition_id: u64, field_id: u64, schema_id: u32) -> Self {
78-
env_logger::init();
78+
let _ = env_logger::try_init();
7979
Self {
8080
job_id: None,
8181
server_port,
@@ -446,6 +446,129 @@ async fn index_two_cells() {
446446
env.create_index().await.unwrap();
447447
env.index_cells().await.unwrap();
448448
}
449+
450+
#[tokio::test]
451+
async fn index_three_cells() {
452+
let env = TestEnvironment::new(5001, 1, VECTOR_FIELD_ID, CELL_SCHEMA_ID)
453+
.with_test_vectors(vec![
454+
vec![1.0, 2.0, 3.0],
455+
vec![4.0, 5.0, 6.0],
456+
vec![7.0, 8.0, 9.0],
457+
])
458+
.with_job_logger()
459+
.with_job_id(JobId::new(1, 1))
460+
.initialize_server()
461+
.await
462+
.initialize_schemas()
463+
.await
464+
.unwrap()
465+
.initialize_cells()
466+
.await
467+
.unwrap();
468+
env.create_index().await.unwrap();
469+
env.index_cells().await.unwrap();
470+
}
471+
472+
#[tokio::test]
473+
async fn index_tens_of_cells() {
474+
// Create 50 test vectors
475+
let mut test_vectors = Vec::with_capacity(50);
476+
for i in 0..50 {
477+
test_vectors.push(vec![i as f32, (i + 1) as f32, (i + 2) as f32]);
478+
}
479+
480+
let env = TestEnvironment::new(5002, 1, VECTOR_FIELD_ID, CELL_SCHEMA_ID)
481+
.with_test_vectors(test_vectors)
482+
.with_job_logger()
483+
.with_job_id(JobId::new(1, 1))
484+
.initialize_server()
485+
.await
486+
.initialize_schemas()
487+
.await
488+
.unwrap()
489+
.initialize_cells()
490+
.await
491+
.unwrap();
492+
493+
env.create_index().await.unwrap();
494+
env.index_cells().await.unwrap();
495+
}
496+
497+
#[tokio::test]
498+
async fn index_hundreds_of_cells() {
499+
// Create 500 test vectors
500+
let mut test_vectors = Vec::with_capacity(500);
501+
for i in 0..500 {
502+
test_vectors.push(vec![i as f32, (i + 1) as f32, (i + 2) as f32]);
503+
}
504+
505+
let env = TestEnvironment::new(5003, 1, VECTOR_FIELD_ID, CELL_SCHEMA_ID)
506+
.with_test_vectors(test_vectors)
507+
.with_job_logger()
508+
.with_job_id(JobId::new(1, 1))
509+
.initialize_server()
510+
.await
511+
.initialize_schemas()
512+
.await
513+
.unwrap()
514+
.initialize_cells()
515+
.await
516+
.unwrap();
517+
518+
env.create_index().await.unwrap();
519+
env.index_cells().await.unwrap();
520+
}
521+
522+
// #[tokio::test]
523+
async fn index_thousands_of_cells() {
524+
// Create 2000 test vectors
525+
let mut test_vectors = Vec::with_capacity(2000);
526+
for i in 0..2000 {
527+
test_vectors.push(vec![i as f32, (i + 1) as f32, (i + 2) as f32]);
528+
}
529+
530+
let env = TestEnvironment::new(5004, 1, VECTOR_FIELD_ID, CELL_SCHEMA_ID)
531+
.with_test_vectors(test_vectors)
532+
.with_job_logger()
533+
.with_job_id(JobId::new(1, 1))
534+
.initialize_server()
535+
.await
536+
.initialize_schemas()
537+
.await
538+
.unwrap()
539+
.initialize_cells()
540+
.await
541+
.unwrap();
542+
543+
env.create_index().await.unwrap();
544+
env.index_cells().await.unwrap();
545+
}
546+
547+
// #[tokio::test]
548+
async fn index_many_thousands_of_cells() {
549+
// Create 5000 test vectors
550+
let mut test_vectors = Vec::with_capacity(5000);
551+
for i in 0..5000 {
552+
test_vectors.push(vec![i as f32, (i + 1) as f32, (i + 2) as f32]);
553+
}
554+
555+
let env = TestEnvironment::new(5005, 1, VECTOR_FIELD_ID, CELL_SCHEMA_ID)
556+
.with_test_vectors(test_vectors)
557+
.with_job_logger()
558+
.with_job_id(JobId::new(1, 1))
559+
.initialize_server()
560+
.await
561+
.initialize_schemas()
562+
.await
563+
.unwrap()
564+
.initialize_cells()
565+
.await
566+
.unwrap();
567+
568+
env.create_index().await.unwrap();
569+
env.index_cells().await.unwrap();
570+
}
571+
449572
// Example of how to set up a functional test (commented out since it needs user implementation)
450573
/*
451574
#[async_std::test]

src/apps/hnsw/partition/utils.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,7 @@ pub fn merge_history(
3232
history: &RingBuffer<(Id, Distance)>,
3333
new_history: &RingBuffer<(Id, Distance)>,
3434
) -> RingBuffer<(Id, Distance)> {
35-
debug_assert_eq!(history.len(), new_history.len());
36-
let mut merged_history = RingBuffer::new(history.len());
37-
let mut buffer = Vec::with_capacity(history.len() << 1);
38-
buffer.extend(history.to_vec());
39-
buffer.extend(new_history.to_vec());
40-
buffer.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap());
41-
if !buffer.is_empty() {
42-
merged_history.extend_from_slice(&buffer[..buffer.len()]);
43-
}
44-
merged_history
35+
history.merge_sorted(new_history, |a, b| b.1.partial_cmp(&a.1).unwrap())
4536
}
4637

4738
pub fn data_partition_key(field_id: u64, cell: &OwnedCell) -> Result<u64, HNSWIndexError> {

src/graph/edge/bilateral.rs

Lines changed: 63 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use futures::{FutureExt, TryFutureExt};
44
use neb::client::transaction::{Transaction, TxnError};
55
use neb::ram::cell::OwnedCell;
66
use neb::ram::types::Id;
7+
use std::future;
78
use std::sync::Arc;
89

910
use super::super::id_list::IdList;
@@ -37,50 +38,70 @@ pub trait BilateralEdge: TEdge + Sync + Send {
3738
txn: &'a Transaction,
3839
id: Id,
3940
) -> BoxFuture<'a, Result<Result<Self::Edge, EdgeError>, TxnError>> {
40-
txn.read(id)
41-
.map_ok(move |trace_cell| {
42-
let trace_cell = match trace_cell {
43-
Some(cell) => cell,
44-
None => return Err(EdgeError::CellNotFound),
45-
};
46-
let cell_schema_type = match schemas.schema_type(trace_cell.header.schema) {
47-
Some(t) => t,
48-
None => return Err(EdgeError::CannotFindSchema),
49-
};
50-
let mut a_id = Id::unit_id();
51-
let mut b_id = Id::unit_id();
52-
let edge_cell = match cell_schema_type {
53-
GraphSchema::Vertex => {
54-
if vertex_field == Self::vertex_a_field() {
55-
a_id = vertex_id;
56-
b_id = id;
57-
} else if vertex_field == Self::vertex_b_field() {
58-
b_id = vertex_id;
59-
a_id = id;
60-
} else {
61-
return Err(EdgeError::WrongVertexField);
62-
}
63-
None
64-
}
65-
GraphSchema::Edge(edge_attrs) => {
66-
if edge_attrs.edge_type == Self::edge_type() {
67-
if let (&OwnedValue::Id(e_a_id), &OwnedValue::Id(e_b_id)) = (
68-
&trace_cell.data[Self::edge_a_field()],
69-
&trace_cell.data[Self::edge_b_field()],
70-
) {
71-
a_id = e_a_id;
72-
b_id = e_b_id;
41+
// println!("getting edge from id: {:?}, schema_id: {:?}", id, schema_id);
42+
let edge_schema = schemas.schema_type(schema_id).unwrap();
43+
if let GraphSchema::Edge(edge_attrs) = edge_schema {
44+
if edge_attrs.has_body {
45+
return txn
46+
.read(id)
47+
.map_ok(move |trace_cell| {
48+
println!("trace_cell: {:?}", trace_cell);
49+
let trace_cell = match trace_cell {
50+
Some(cell) => cell,
51+
None => return Err(EdgeError::CellNotFound),
52+
};
53+
let cell_schema_type = match schemas.schema_type(trace_cell.header.schema) {
54+
Some(t) => t,
55+
None => return Err(EdgeError::CannotFindSchema),
56+
};
57+
let mut a_id = Id::unit_id();
58+
let mut b_id = Id::unit_id();
59+
let edge_cell = match cell_schema_type {
60+
GraphSchema::Vertex => {
61+
if vertex_field == Self::vertex_a_field() {
62+
a_id = vertex_id;
63+
b_id = id;
64+
} else if vertex_field == Self::vertex_b_field() {
65+
b_id = vertex_id;
66+
a_id = id;
67+
} else {
68+
return Err(EdgeError::WrongVertexField);
69+
}
70+
None
7371
}
74-
Some(trace_cell)
75-
} else {
76-
return Err(EdgeError::WrongEdgeType);
77-
}
78-
}
79-
_ => return Err(EdgeError::WrongSchema),
80-
};
81-
Ok(Self::build_edge(a_id, b_id, schema_id, edge_cell))
82-
})
72+
GraphSchema::Edge(edge_attrs) => {
73+
if edge_attrs.edge_type == Self::edge_type() {
74+
if let (&OwnedValue::Id(e_a_id), &OwnedValue::Id(e_b_id)) = (
75+
&trace_cell.data[Self::edge_a_field()],
76+
&trace_cell.data[Self::edge_b_field()],
77+
) {
78+
a_id = e_a_id;
79+
b_id = e_b_id;
80+
}
81+
Some(trace_cell)
82+
} else {
83+
return Err(EdgeError::WrongEdgeType);
84+
}
85+
}
86+
_ => return Err(EdgeError::WrongSchema),
87+
};
88+
Ok(Self::build_edge(a_id, b_id, schema_id, edge_cell))
89+
})
90+
.boxed();
91+
} else {
92+
let a_id = vertex_id;
93+
let b_id = id;
94+
async move {
95+
Ok(Ok(Self::build_edge(a_id, b_id, schema_id, None)))
96+
}
97+
.boxed()
98+
}
99+
} else {
100+
async move {
101+
Ok(Err(EdgeError::WrongSchema))
102+
}
83103
.boxed()
104+
}
84105
}
85106

86107
fn link<'a>(

src/graph/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -733,6 +733,7 @@ impl<P: Partitioner> GraphTransaction<P> {
733733
id,
734734
)
735735
.await?;
736+
// println!("edge: {:?}", edge);
736737
match edge {
737738
Ok(edge) => {
738739
if let Some(opposite_id) = edge.vertex_id_opposite(vertex_id) {

0 commit comments

Comments
 (0)