Skip to content

Commit 13447a1

Browse files
committed
Fix embedding idempotency and test stability
1 parent 6e58e93 commit 13447a1

8 files changed

Lines changed: 99 additions & 5 deletions

File tree

src/apps/embedding/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,11 +188,13 @@ impl EmbeddingIndexer {
188188
vector,
189189
);
190190

191+
// Embedding cell IDs are deterministic per document/model pair, so retries and
192+
// duplicate indexing requests must be idempotent.
191193
self.neb_client
192-
.write_cell(emb_cell)
194+
.upsert_cell(emb_cell)
193195
.await
194-
.map_err(|e| IndexError::Other(format!("Failed to write embedding cell: {:?}", e)))?
195-
.map_err(|e| IndexError::Other(format!("Failed to write embedding cell: {:?}", e)))?;
196+
.map_err(|e| IndexError::Other(format!("Failed to persist embedding cell: {:?}", e)))?
197+
.map_err(|e| IndexError::Other(format!("Failed to persist embedding cell: {:?}", e)))?;
196198

197199
Ok(())
198200
}

src/apps/embedding/tests.rs

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,13 +354,23 @@ mod provider_tests {
354354

355355
#[cfg(test)]
356356
mod integration_tests {
357-
use dovahkiin::types::{Id, Map, OwnedMap, OwnedValue, Type};
357+
use dovahkiin::types::{Id, Map, OwnedMap, OwnedPrimArray, OwnedValue, Type};
358358
use neb::{
359359
index::embedding::EmbeddingModel,
360+
index::vector::HnswConfig,
360361
ram::schema::{Field, IndexType, Schema},
361362
ram::types::RandValue,
362363
};
363364

365+
use crate::apps::{
366+
embedding::{
367+
EmbeddingIndexer,
368+
schema::{
369+
create_embedding_cell_schema_for_client, embedding_cell_id, vector_field_id,
370+
},
371+
},
372+
hnsw::measurements::MetricEncoding,
373+
};
364374
use crate::tests::start_server;
365375

366376
const TEST_SCHEMA_ID: u32 = 9001;
@@ -802,6 +812,78 @@ mod integration_tests {
802812
}
803813
}
804814

815+
#[tokio::test]
816+
#[ignore] // Run with: cargo test test_embedding_cell_write_is_idempotent -- --ignored --exact --nocapture
817+
async fn test_embedding_cell_write_is_idempotent() {
818+
let server = start_server(6021, "embedding_upsert_regression").await.unwrap();
819+
820+
let _partition = server.init_hnsw_index_partition_service().await.unwrap();
821+
let _hnsw = server.init_hnsw_index_service().await.unwrap();
822+
823+
let runtime = server.current_runtime();
824+
let indexer = EmbeddingIndexer::new(&runtime).await.unwrap();
825+
826+
const DIRECT_SCHEMA_ID: u32 = 9002;
827+
let field_id = bifrost_hasher::hash_str(TEST_FIELD);
828+
let full_model = "test:test-model";
829+
let dimensions = 4usize;
830+
831+
let emb_schema_id = create_embedding_cell_schema_for_client(
832+
server.neb_client.as_ref(),
833+
full_model,
834+
DIRECT_SCHEMA_ID,
835+
field_id,
836+
dimensions,
837+
)
838+
.await
839+
.unwrap();
840+
841+
indexer
842+
.hnsw_coordinator
843+
.new_index(
844+
format!("EMB-{DIRECT_SCHEMA_ID}-{field_id}-{full_model}"),
845+
emb_schema_id,
846+
vector_field_id(),
847+
HnswConfig::default(),
848+
)
849+
.await
850+
.unwrap()
851+
.unwrap();
852+
853+
let doc_id = Id::rand();
854+
let vector = vec![0.5, 0.5, 0.5, 0.5];
855+
856+
indexer
857+
.write_embedding_cell(&doc_id, DIRECT_SCHEMA_ID, field_id, full_model, vector.clone())
858+
.await
859+
.unwrap();
860+
861+
indexer
862+
.write_embedding_cell(&doc_id, DIRECT_SCHEMA_ID, field_id, full_model, vector.clone())
863+
.await
864+
.unwrap();
865+
866+
let emb_cell_id = embedding_cell_id(&doc_id, DIRECT_SCHEMA_ID, field_id, full_model);
867+
server.neb_client.read_cell(emb_cell_id).await.unwrap().unwrap();
868+
869+
let hits = indexer
870+
.hnsw_coordinator
871+
.query_top_k(
872+
emb_schema_id,
873+
vector_field_id(),
874+
OwnedPrimArray::F32(vector),
875+
10,
876+
32,
877+
MetricEncoding::Cosine,
878+
)
879+
.await
880+
.unwrap()
881+
.unwrap();
882+
883+
assert_eq!(hits.len(), 1, "duplicate writes should not create duplicate vector entries");
884+
assert_eq!(hits[0].0, emb_cell_id);
885+
}
886+
805887
/// Test multi-model support on the same field
806888
#[tokio::test]
807889
#[ignore]

src/apps/recovery_integrity_tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ pub async fn start_test_server(
242242
Service::RangedIndexer,
243243
Service::Query,
244244
];
245+
server_config.storage.disable_storage_locks = true;
245246

246247
// Configure persistent storage
247248
server_config.storage.backup_storage = Some(config.backup_path());

src/apps/wikidata/tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ fn create_test_config(port: u32, group_name: &str, enable_recovery: bool) -> Mor
8181
Service::RangedIndexer,
8282
Service::Query,
8383
];
84+
opts.storage.disable_storage_locks = true;
8485
opts.storage.backup_storage = Some(backup_path.clone());
8586
opts.storage.wal_storage = Some(wal_path);
8687
opts.storage.raft_storage = Some(raft_path);

src/config/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ impl From<HumanReadableMorphesOptions> for MorphesOptions {
101101
index_enabled: hr_options.storage.index_enabled,
102102
raft_storage: hr_options.storage.raft_storage,
103103
undo_log_storage: hr_options.storage.undo_log_storage,
104+
disable_storage_locks: false,
104105
tiered_config: hr_options.storage.tiered_config.map(|tc| {
105106
neb::ram::tiered::TieredConfig {
106107
threshold: tc.threshold as f32,

src/config/tests.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ storage:
4848
assert_eq!(config.storage.db_size, 1024 * 1024 * 1024); // Should be converted to bytes
4949
assert_eq!(config.server_addr, "127.0.0.1:5400");
5050
assert_eq!(config.group_name, "TestMorpheus");
51+
assert!(!config.storage.disable_storage_locks);
5152
}
5253

5354
#[test]
@@ -81,5 +82,6 @@ storage:
8182

8283
// Verify that tiered memory configuration is None (disabled)
8384
assert!(config.storage.tiered_config.is_none());
85+
assert!(!config.storage.disable_storage_locks);
8486
}
8587
}

src/tests/compute/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,10 @@ impl GasProgram for TestProgram {
175175
accum: &<Self::C as GasCodec>::Accum,
176176
_ctx: &mut ApplyCtx,
177177
) {
178-
*state = *accum;
178+
if *state != *accum {
179+
*state = *accum;
180+
_ctx.updated = true;
181+
}
179182
}
180183

181184
fn scatter(

src/tests/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ pub fn start_server_with_storage_and_recovery<'a>(
8282
Service::RangedIndexer,
8383
Service::Query,
8484
];
85+
config.storage.disable_storage_locks = true;
8586

8687
// Configure persistent storage if paths provided
8788
// All storage paths are required for full persistence functionality
@@ -137,6 +138,7 @@ pub fn start_server_in_cluster<'a>(
137138
Service::Query,
138139
];
139140
config.storage.index_enabled = true;
141+
config.storage.disable_storage_locks = true;
140142
MorpheusServer::new(config)
141143
}
142144

0 commit comments

Comments
 (0)