Skip to content

Commit bbfa7ab

Browse files
committed
fix all obilivous errors
1 parent f8d5045 commit bbfa7ab

5 files changed

Lines changed: 94 additions & 19 deletions

File tree

src/apps/hnsw/partition/search.rs

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,18 @@ impl HnswOnlinePartition {
222222

223223
match cell.data[0usize] {
224224
SharedValue::Id(id) => Some(*id),
225+
SharedValue::Null | SharedValue::NA => {
226+
append_job_log(
227+
logger,
228+
job_id,
229+
JobLogLevel::Error,
230+
format!(
231+
"Cannot find vector cell in vertex id {:?} with body {:?}",
232+
vertex_id, cell.data
233+
),
234+
);
235+
None
236+
}
225237
_ => {
226238
append_job_log(
227239
logger,
@@ -315,15 +327,19 @@ impl HnswOnlinePartition {
315327
let selected_vertex = frontier
316328
.iter()
317329
.filter_map(|vertex_id| {
318-
self.get_vertex_distance(
330+
let (_cell_id, distance) = self.get_vertex_distance(
319331
vertex_id, query, field_id, metric, metadata, chunks, logger, job_id,
320-
)
332+
)?;
333+
Some((*vertex_id, distance))
321334
})
322-
.filter(|(id, _)| !metadata.visited.contains(id))
335+
.filter(|(vid, _)| !metadata.visited.contains(vid))
323336
// Greedy select the closest vertex
324337
.min_by(|(_, d1), (_, d2)| d1.partial_cmp(d2).unwrap());
325338
let selected_vertex = match selected_vertex {
326-
Some(vertex) => vertex,
339+
Some(vertex) => {
340+
info!("Selected vertex at level {} is {:?}", metadata.level, vertex);
341+
vertex
342+
},
327343
None => {
328344
append_job_log(
329345
logger,
@@ -800,12 +816,22 @@ impl HnswOnlinePartition {
800816
let level_entry = metadata.level_entries.get(&level);
801817
if let Some(entry_point) = level_entry {
802818
entry_points.push(*entry_point);
819+
info!("Level {} entry point is {:?}", level, entry_point);
803820
}
804821
if level == 0 || level_entry.is_none() {
805822
// For the lowest level, we need to use the search history
806823
entry_points.extend(metadata.history.iter().map(|(id, _)| *id).take(1));
824+
info!(
825+
"Entry point at level {} is {:?}, history is {:?}",
826+
level,
827+
entry_points,
828+
metadata
829+
.history
830+
.iter()
831+
.map(|(id, _)| *id)
832+
.collect::<Vec<_>>()
833+
);
807834
}
808-
809835
if entry_points.is_empty() {
810836
// No entry points, we can't connect anything
811837
append_job_log(

src/apps/hnsw/partition/tests.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use dovahkiin::data_map;
77
use dovahkiin::types::{Id, OwnedMap, OwnedPrimArray, OwnedValue, Type};
88
use lightning::map::{Map, PtrHashMap};
99
use neb::client::AsyncClient;
10+
use neb::exec::query::env;
1011
use neb::ram::cell::{self, OwnedCell};
1112
use neb::ram::chunk::Chunks;
1213
use neb::ram::schema::{Field, Schema as NebSchema};
@@ -74,6 +75,7 @@ pub struct TestEnvironment {
7475
impl TestEnvironment {
7576
/// Create a new test environment with test vectors
7677
pub fn new(server_port: u32, partition_id: u64, field_id: u64, schema_id: u32) -> Self {
78+
env_logger::init();
7779
Self {
7880
job_id: None,
7981
server_port,
@@ -97,7 +99,7 @@ impl TestEnvironment {
9799
.enumerate()
98100
.map(|(i, vector)| {
99101
let cell_higher = self.partition_id; // Most share the same partition ID
100-
let cell_lower = 2000 + i as u64; // Unique cell IDs
102+
let cell_lower = 1000 + i as u64; // Unique cell IDs
101103
TestVector::new(cell_higher, cell_lower, OwnedPrimArray::F32(vector))
102104
})
103105
.collect();
@@ -426,6 +428,24 @@ async fn index_one_cell() {
426428
env.create_index().await.unwrap();
427429
env.index_cells().await.unwrap();
428430
}
431+
432+
#[tokio::test]
433+
async fn index_two_cells() {
434+
let env = TestEnvironment::new(5001, 1, VECTOR_FIELD_ID, CELL_SCHEMA_ID)
435+
.with_test_vectors(vec![vec![1.0, 2.0, 3.0], vec![4.0, 5.0, 6.0]])
436+
.with_job_logger()
437+
.with_job_id(JobId::new(1, 1))
438+
.initialize_server()
439+
.await
440+
.initialize_schemas()
441+
.await
442+
.unwrap()
443+
.initialize_cells()
444+
.await
445+
.unwrap();
446+
env.create_index().await.unwrap();
447+
env.index_cells().await.unwrap();
448+
}
429449
// Example of how to set up a functional test (commented out since it needs user implementation)
430450
/*
431451
#[async_std::test]

src/graph/id_list.rs

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -198,31 +198,40 @@ impl<'a> IdList<'a> {
198198
if let Some(id) = self.cached_type_list_id {
199199
return Ok(Ok(id));
200200
}
201-
201+
let field_id = self.field_id;
202202
// Read the type list ID from container
203203
let fields = match self
204204
.txn
205-
.read_selected(self.container_id, vec![self.field_id])
205+
.read_selected(self.container_id, vec![field_id])
206206
.await?
207207
{
208208
Some(f) => f,
209209
None => return Ok(Err(IdListError::ContainerCellNotFound)),
210210
};
211-
211+
let mut create_new_type_list = async || {
212+
let result = self.create_new_type_list().await?;
213+
if let Ok(new_id) = result {
214+
self.cached_type_list_id = Some(new_id);
215+
}
216+
Ok(result)
217+
};
212218
match &fields[0usize] {
213219
OwnedValue::Id(id) => {
214220
if id.is_unit_id() && ensure_container {
215-
let result = self.create_new_type_list().await?;
216-
if let Ok(new_id) = result {
217-
self.cached_type_list_id = Some(new_id);
218-
}
219-
Ok(result)
221+
create_new_type_list().await
220222
} else {
221223
self.cached_type_list_id = Some(*id);
222224
Ok(Ok(*id))
223225
}
224226
}
225-
_ => Ok(Err(IdListError::FormatError)),
227+
OwnedValue::Null | OwnedValue::NA => {
228+
trace!("Creating new type list for {}", field_id);
229+
create_new_type_list().await
230+
}
231+
_ => {
232+
error!("Failed to get type list id, got {:?}", fields);
233+
Ok(Err(IdListError::FormatError))
234+
}
226235
}
227236
}
228237

@@ -253,9 +262,12 @@ impl<'a> IdList<'a> {
253262
ensure_container: bool,
254263
) -> Result<Result<Id, IdListError>, TxnError> {
255264
// Read the type list cell
256-
let mut type_list_cell = match self.txn.read(type_list_id).await? {
265+
let type_list_cell = match self.txn.read(type_list_id).await? {
257266
Some(cell) => cell,
258-
None => return Ok(Err(IdListError::FormatError)),
267+
None => {
268+
error!("Failed to get type list cell, id {:?}", type_list_id);
269+
return Ok(Err(IdListError::FormatError));
270+
}
259271
};
260272

261273
// Find existing list for this schema
@@ -264,6 +276,10 @@ impl<'a> IdList<'a> {
264276
return Ok(Ok(list_id));
265277
}
266278
} else {
279+
error!(
280+
"Failed to get type list, index {:?} got {:?}",
281+
*ID_TYPES_MAP_ID, type_list_cell
282+
);
267283
return Ok(Err(IdListError::FormatError));
268284
}
269285

@@ -312,6 +328,10 @@ impl<'a> IdList<'a> {
312328
self.txn.update(type_list_cell).await?;
313329
Ok(Ok(list_id))
314330
} else {
331+
error!(
332+
"Failed to get type list, index {:?} got {:?}",
333+
*ID_TYPES_MAP_ID, type_list_cell
334+
);
315335
Ok(Err(IdListError::FormatError))
316336
}
317337
}
@@ -490,9 +510,11 @@ impl<'a> IdList<'a> {
490510
};
491511
array.remove(index);
492512
} else {
513+
error!("Failed to get list, index {:?} got {:?}", *LIST_KEY_ID, map);
493514
return Ok(Err(IdListError::FormatError));
494515
}
495516
} else {
517+
error!("Failed to get list, {:?}", seg.data);
496518
return Ok(Err(IdListError::FormatError));
497519
}
498520
self.txn.update(seg).await.unwrap();

src/graph/vertex/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,10 @@ where
6060
let (type_list_id, schemas_ids) =
6161
match IdList::cell_types(&txn, id, field_id).await? {
6262
Some(t) => t,
63-
None => return Ok(Err(RemoveError::FormatError)),
63+
None => {
64+
error!("Failed to get type list, {:?}", id);
65+
return Ok(Err(RemoveError::FormatError));
66+
}
6467
};
6568
for schema_id in schemas_ids {
6669
let mut id_list =

src/job/logger.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,11 @@ pub fn append_job_log(
4444
});
4545
}
4646
None => {
47-
error!("Failed to get job logger by job id: {:?}", job_id);
47+
error!(
48+
"Failed to get job logger by job id: {:?}, inserting new job logger",
49+
job_id
50+
);
51+
job_logger.try_insert(job_id, Arc::new(Mutex::new(Vec::new())));
4852
}
4953
}
5054
}

0 commit comments

Comments
 (0)