Skip to content

Commit 82356e6

Browse files
committed
Add retry logic with exponential backoff for edge creation
- Added retry_edge_creation helper function to handle transaction conflicts - Implements exponential backoff with jitter for NotRealizable/TooManyRetry errors - Up to 5 retries per edge creation attempt - Fixes double Arc wrapping issue in worker task setup - Reduces edge creation failures during high-concurrency imports
1 parent f032d02 commit 82356e6

3 files changed

Lines changed: 1002 additions & 84 deletions

File tree

ID_LIST_FIXES_SUMMARY.md

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
# IdList Error Handling Improvements - Summary
2+
3+
## Problem Fixed
4+
5+
The original error messages were vague:
6+
```
7+
[ERROR] Cannot find next key, got cell Err(NotRealizable)
8+
[ERROR] Last segment cell doesn not existed, id None, root Ok(Id {...})
9+
```
10+
11+
## Solution
12+
13+
**Changed iterator return types from `Option<T>` to `Result<Option<T>, TxnError>`** to properly propagate transaction errors:
14+
15+
### Files Modified
16+
17+
1. **`src/graph/id_list.rs`** - Core iterator implementations
18+
- `IdListSegmentIdIterator::next()` and `collect()`
19+
- `IdListSegmentIterator::next()` and `last()`
20+
- `IdListIterator::next()`, `count()`, `collect()`, `last()`, `next_seg()`
21+
- Added detailed diagnostic logging
22+
23+
2. **`src/graph/mod.rs`** - Updated calling code
24+
- Fixed 2 iterator usages to handle `Result`
25+
26+
3. **`src/graph/vertex/mod.rs`** - Updated calling code
27+
- Fixed 1 iterator usage to handle `Result`
28+
29+
### Key Changes
30+
31+
**Before:**
32+
```rust
33+
pub async fn next(&mut self) -> Option<Id> {
34+
let cell = self.txn.read_selected(...).await;
35+
match &cell {
36+
Ok(Some(fields)) => { /* ... */ }
37+
_ => {
38+
error!("Cannot find next key, got cell {:?}", cell)
39+
}
40+
}
41+
None // ❌ Silently swallows errors
42+
}
43+
```
44+
45+
**After:**
46+
```rust
47+
pub async fn next(&mut self) -> Result<Option<Id>, TxnError> {
48+
let cell = self.txn.read_selected(...).await;
49+
match cell {
50+
Ok(Some(fields)) => Ok(Some(id)),
51+
Ok(None) => {
52+
error!("Segment cell not found at level {}, id {:?}",
53+
self.level, current_id);
54+
Ok(None)
55+
}
56+
Err(e) => {
57+
error!("Transaction error reading segment at level {}, id {:?}: {:?}",
58+
self.level, current_id, e);
59+
Err(e) // ✅ Properly propagates errors
60+
}
61+
}
62+
}
63+
```
64+
65+
## Benefits
66+
67+
1. **Better Error Visibility** - Transaction errors now surface with context
68+
2. **Easier Debugging** - Logs include segment level, IDs, and error types
69+
3. **Proper Error Propagation** - Errors bubble up instead of being lost
70+
4. **Production Ready** - Integration tests already cover IdList functionality
71+
72+
## Testing
73+
74+
The IdList module is already thoroughly tested through the graph API in:
75+
- `src/tests/graph/mod.rs::relationship()` - Comprehensive edge operations
76+
- `src/tests/graph/mod.rs::edge_operations()` - Edge creation and retrieval
77+
- `src/tests/graph/mod.rs::graph_traversal()` - Neighborhood queries
78+
79+
All existing tests continue to pass, confirming the improvements don't break functionality.
80+
81+
## Migration Guide
82+
83+
Code using IdList iterators needs to be updated:
84+
85+
```rust
86+
// OLD - won't compile anymore
87+
while let Some(id) = iter.next().await {
88+
// process
89+
}
90+
91+
// NEW - handle Result
92+
loop {
93+
match iter.next().await {
94+
Ok(Some(id)) => {
95+
// process
96+
}
97+
Ok(None) => break,
98+
Err(e) => return Err(e),
99+
}
100+
}
101+
102+
// OR with ? operator
103+
while let Some(id) = iter.next().await? {
104+
// process
105+
}
106+
```
107+
108+
## Status
109+
110+
**All changes compile successfully**
111+
**Existing tests pass**
112+
**Better error diagnostics in place**
113+
114+

src/apps/wikidata/importer.rs

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ use tokio::task;
2323
use std::time::{Instant, Duration};
2424
use std::sync::atomic::{AtomicUsize, Ordering};
2525
use futures::stream::{FuturesUnordered, StreamExt};
26+
use neb::client::transaction::TxnError;
27+
use crate::graph::LinkVerticesError;
2628

2729
#[derive(Error, Debug)]
2830
pub enum ImportError {
@@ -128,6 +130,56 @@ pub fn create_optimized_line_reader(file_path: &str) -> Result<Box<dyn Iterator<
128130
}
129131
}
130132

133+
/// Helper function to retry edge creation with exponential backoff
134+
/// Handles transaction errors like NotRealizable and TooManyRetry
135+
async fn retry_edge_creation<F, Fut>(
136+
server: Arc<MorpheusServer>,
137+
from_id: Id,
138+
to_id: Id,
139+
edge_data: Option<OwnedMap>,
140+
create_fn: F,
141+
max_retries: u32,
142+
) -> Result<(), String>
143+
where
144+
F: Fn(Arc<MorpheusServer>, Id, Id, Option<OwnedMap>) -> Fut,
145+
Fut: std::future::Future<Output = Result<Result<crate::graph::edge::Edge, LinkVerticesError>, TxnError>>,
146+
{
147+
let mut retry_count = 0;
148+
let base_delay = Duration::from_millis(10);
149+
150+
loop {
151+
match create_fn(server.clone(), from_id, to_id, edge_data.clone()).await {
152+
Ok(Ok(_)) => return Ok(()),
153+
Ok(Err(e)) => {
154+
// Non-transaction error (e.g., schema not found), don't retry
155+
return Err(format!("{:?}", e));
156+
}
157+
Err(e) => {
158+
// Transaction error - check if we should retry
159+
let error_str = format!("{:?}", e);
160+
let should_retry = error_str.contains("NotRealizable")
161+
|| error_str.contains("TooManyRetry")
162+
|| error_str.contains("Conflict")
163+
|| error_str.contains("Transaction");
164+
165+
if should_retry && retry_count < max_retries {
166+
retry_count += 1;
167+
// Exponential backoff with jitter (using retry_count for pseudo-randomness)
168+
let delay = base_delay * (1 << retry_count.min(8)); // Cap at 2^8 = 256 * base_delay
169+
let jitter = Duration::from_millis((retry_count as u64 * 7) % 50); // Simple pseudo-random jitter
170+
let total_delay = delay + jitter;
171+
172+
tokio::time::sleep(total_delay).await;
173+
continue;
174+
} else {
175+
// Max retries reached or non-retriable error
176+
return Err(format!("Failed after {} retries: {:?}", retry_count, e));
177+
}
178+
}
179+
}
180+
}
181+
}
182+
131183
/// Wikidata importer with three-phase import strategy
132184
/// Phase 1: Import all entities and properties as vertices
133185
/// Phase 2: Import statements and create graph edges for Q→Q relationships
@@ -815,7 +867,7 @@ impl WikidataImporter {
815867

816868
// Spawn worker tasks
817869
let mut worker_handles = Vec::new();
818-
let server = Arc::new(self.server.clone());
870+
let server = self.server.clone(); // Already an Arc
819871
let parser = self.parser.clone();
820872
let entity_id_map = self.entity_id_map.clone();
821873

@@ -959,13 +1011,25 @@ impl WikidataImporter {
9591011
literal_id: None,
9601012
}.to_owned_map();
9611013

962-
match server_clone.graph.link(entity_id, "wikidata_link", target_entity_id, &Some(edge_data)).await {
1014+
// Use retry helper for edge creation with exponential backoff
1015+
match retry_edge_creation(
1016+
server_clone,
1017+
entity_id,
1018+
target_entity_id,
1019+
Some(edge_data),
1020+
|srv, from, to, body| {
1021+
async move {
1022+
srv.graph.link(from, "wikidata_link", to, &body).await
1023+
}
1024+
},
1025+
5, // max 5 retries
1026+
).await {
9631027
Ok(_) => {
9641028
edges_created_clone.fetch_add(1, Ordering::Relaxed);
9651029
local_stats.1 += 1;
9661030
}
9671031
Err(e) => {
968-
eprintln!("Worker {}: Failed to create edge {:?} -> {:?}: {:?}", worker_id, entity_id, target_entity_id, e);
1032+
eprintln!("Worker {}: Failed to create edge {:?} -> {:?}: {}", worker_id, entity_id, target_entity_id, e);
9691033
local_stats.3 += 1;
9701034
}
9711035
}

0 commit comments

Comments
 (0)