Skip to content

Commit 1500f0d

Browse files
committed
feat: Add tiered memory recovery test and no-embedding flag
1 parent 2a25183 commit 1500f0d

5 files changed

Lines changed: 353 additions & 51 deletions

File tree

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
#!/bin/bash
2+
# Tiered Memory Recovery Test Script
3+
#
4+
# This script tests system recovery when storage consumption exceeds physical memory.
5+
# It uses a small physical memory limit (256MB) and imports a larger dataset.
6+
# The embedding index is disabled to focus on storage/tiered memory behavior.
7+
#
8+
# Usage: ./scripts/test_tiered_memory_recovery.sh [max_entities]
9+
# max_entities: Number of entities to import (default: 50000 - should exceed 256MB)
10+
11+
set -e
12+
13+
# Configuration
14+
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
15+
PROJECT_DIR="$(cd "$SCRIPT_DIR/.." && pwd)"
16+
WIKIDATA_FILE="${WIKIDATA_FILE:-/mnt/micron/wikidata20160104.json}"
17+
MAX_ENTITIES="${1:-50000}" # Default increased to ensure we exceed 256MB
18+
STORAGE_BASE="/tmp/tiered_memory_test"
19+
IMPORT_CONFIG_FILE="$STORAGE_BASE/import_config.yaml"
20+
RECOVERY_CONFIG_FILE="$STORAGE_BASE/recovery_config.yaml"
21+
PORT=7201 # Different port to avoid conflict
22+
VERIFICATION_LOG="$STORAGE_BASE/verification.log"
23+
24+
# Colors for output
25+
RED='\033[0;31m'
26+
GREEN='\033[0;32m'
27+
YELLOW='\033[1;33m'
28+
BLUE='\033[0;34m'
29+
CYAN='\033[0;36m'
30+
NC='\033[0m' # No Color
31+
32+
log_info() {
33+
echo -e "${BLUE}[INFO]${NC} $1"
34+
}
35+
36+
log_success() {
37+
echo -e "${GREEN}[SUCCESS]${NC} $1"
38+
}
39+
40+
log_warn() {
41+
echo -e "${YELLOW}[WARN]${NC} $1"
42+
}
43+
44+
log_error() {
45+
echo -e "${RED}[ERROR]${NC} $1"
46+
}
47+
48+
log_phase() {
49+
echo ""
50+
echo -e "${CYAN}━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━${NC}"
51+
echo -e "${CYAN} $1${NC}"
52+
echo -e "${CYAN}━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━${NC}"
53+
echo ""
54+
}
55+
56+
# Check prerequisites
57+
check_prerequisites() {
58+
log_info "Checking prerequisites..."
59+
60+
if [[ ! -f "$WIKIDATA_FILE" ]]; then
61+
log_error "Wikidata dump file not found: $WIKIDATA_FILE"
62+
log_info "Set WIKIDATA_FILE environment variable to the correct path"
63+
exit 1
64+
fi
65+
66+
if ! command -v cargo &> /dev/null; then
67+
log_error "cargo not found. Please install Rust."
68+
exit 1
69+
fi
70+
71+
# Check for the binary (Debug build)
72+
if [[ ! -f "$PROJECT_DIR/target/debug/wikidata_cli" ]]; then
73+
log_warn "Debug binary not found at $PROJECT_DIR/target/debug/wikidata_cli"
74+
log_info "Please run: cargo build --bin wikidata_cli --no-default-features --features cpu_optimized"
75+
# We don't exit here, allowing the script to try using cargo run if needed or user to build
76+
fi
77+
78+
log_success "Prerequisites check passed"
79+
}
80+
81+
# Clean up previous test data
82+
cleanup() {
83+
log_info "Cleaning up previous test data..."
84+
rm -rf "$STORAGE_BASE"
85+
mkdir -p "$STORAGE_BASE/backup"
86+
mkdir -p "$STORAGE_BASE/wal"
87+
mkdir -p "$STORAGE_BASE/raft"
88+
mkdir -p "$STORAGE_BASE/undo"
89+
log_success "Cleanup complete"
90+
}
91+
92+
# Create configuration files with tight memory limit
93+
create_configs() {
94+
log_info "Creating configuration files with 256MB physical memory limit..."
95+
96+
# Import config
97+
cat > "$IMPORT_CONFIG_FILE" << EOF
98+
server_addr: 127.0.0.1:$PORT
99+
group_name: TieredMemoryTest
100+
meta_members:
101+
- 127.0.0.1:$PORT
102+
storage:
103+
chunk_count: 16
104+
total_size: "64GB"
105+
backup_storage: "$STORAGE_BASE/backup"
106+
wal_storage: "$STORAGE_BASE/wal"
107+
raft_storage: "$STORAGE_BASE/raft"
108+
undo_log_storage: "$STORAGE_BASE/undo"
109+
enable_recovery: false
110+
services:
111+
- Cell
112+
- Transaction
113+
- RangedIndexer
114+
- Query
115+
index_enabled: true
116+
tiered_config:
117+
threshold: 0.8
118+
physical_memory_limit: "256MB"
119+
EOF
120+
121+
# Recovery config
122+
cat > "$RECOVERY_CONFIG_FILE" << EOF
123+
server_addr: 127.0.0.1:$PORT
124+
group_name: TieredMemoryTest
125+
meta_members:
126+
- 127.0.0.1:$PORT
127+
storage:
128+
chunk_count: 16
129+
total_size: "64GB"
130+
backup_storage: "$STORAGE_BASE/backup"
131+
wal_storage: "$STORAGE_BASE/wal"
132+
raft_storage: "$STORAGE_BASE/raft"
133+
undo_log_storage: "$STORAGE_BASE/undo"
134+
enable_recovery: true
135+
services:
136+
- Cell
137+
- Transaction
138+
- RangedIndexer
139+
- Query
140+
index_enabled: true
141+
tiered_config:
142+
threshold: 0.8
143+
physical_memory_limit: "256MB"
144+
EOF
145+
146+
log_success "Configuration files created"
147+
}
148+
149+
# Phase 1: Import data
150+
run_import() {
151+
log_phase "PHASE 1: Importing $MAX_ENTITIES entities (Memory Limit: 256MB)"
152+
153+
cd "$PROJECT_DIR"
154+
155+
log_info "Starting import with:"
156+
log_info " • No Embedding Indexing (--no-embedding)"
157+
log_info " • Batch size: 100"
158+
log_info " • Workers: 4"
159+
log_info " • Max entities: $MAX_ENTITIES"
160+
echo ""
161+
162+
# Run the import
163+
# We use RUST_LOG to see memory usage logs if available, or just standard info
164+
RUST_LOG=morpheus::apps::wikidata=info,info "$PROJECT_DIR/target/debug/wikidata_cli" import \
165+
"$WIKIDATA_FILE" \
166+
--config "$IMPORT_CONFIG_FILE" \
167+
--max-entities "$MAX_ENTITIES" \
168+
--batch-size 100 \
169+
--workers 4 \
170+
--no-embedding 2>&1 | tee "$STORAGE_BASE/import.log"
171+
172+
local exit_code=$?
173+
if [[ $exit_code -ne 0 ]]; then
174+
log_error "Import failed with exit code $exit_code"
175+
exit 1
176+
fi
177+
178+
log_success "Import phase completed successfully"
179+
180+
# Wait a bit for wal flush
181+
log_info "Waiting for WAL flush..."
182+
sleep 5
183+
}
184+
185+
# Phase 2: Recovery Verification
186+
run_recovery_test() {
187+
log_phase "PHASE 2: Testing Recovery (Simulating restart)"
188+
189+
cd "$PROJECT_DIR"
190+
191+
log_info "Starting recovery verification..."
192+
log_info "Attempting to query entities from the recovered store..."
193+
194+
# We use 'import-and-query' or just 'import' script usually just does import.
195+
# To test recovery, run a query command against the RECOVERY config.
196+
# But wikidata_cli 'query' command might be useful here.
197+
# However, to simulate 'recovery', we just need to start the server and read data.
198+
# We can use 'import-and-query' with 0 entities to just start up and query?
199+
# Or better, we can use a dedicated 'query' command or 'server' command if available.
200+
# Looking at wikidata_cli, it has 'Import', 'ImportAndQuery', 'Query'.
201+
# 'Query' seems appropriate.
202+
203+
# Let's query for a known entity (e.g., Q42) to verify it exists.
204+
# Assuming Q42 is within the first 50k entities.
205+
206+
# Enable pipefail to catch errors in piped commands
207+
set -o pipefail
208+
209+
RUST_LOG=info "$PROJECT_DIR/target/debug/wikidata_cli" query \
210+
--config "$RECOVERY_CONFIG_FILE" \
211+
entity --entity-id "Q42" 2>&1 | tee -a "$VERIFICATION_LOG"
212+
213+
local exit_code=$?
214+
if [[ $exit_code -ne 0 ]]; then
215+
log_error "Recovery verification failed! Server might have failed to start or find data."
216+
exit 1
217+
fi
218+
219+
log_success "Recovery verification successful!"
220+
}
221+
222+
# Main execution flow
223+
main() {
224+
check_prerequisites
225+
cleanup
226+
create_configs
227+
run_import
228+
run_recovery_test
229+
}
230+
231+
main

src/apps/wikidata/cli.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ pub enum Commands {
4040
/// Maximum number of entities to import (optional, imports all if not specified)
4141
#[arg(long)]
4242
max_entities: Option<usize>,
43+
/// Disable embedding indexing (default: false)
44+
#[arg(long, default_value = "false")]
45+
no_embedding: bool,
4346
},
4447
/// Import and immediately start query mode
4548
ImportAndQuery {
@@ -66,6 +69,9 @@ pub enum Commands {
6669
/// Maximum number of entities to import (optional, imports all if not specified)
6770
#[arg(long)]
6871
max_entities: Option<usize>,
72+
/// Disable embedding indexing (default: false)
73+
#[arg(long, default_value = "false")]
74+
no_embedding: bool,
6975
},
7076
/// Query operations
7177
Query {

src/apps/wikidata/importer.rs

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,13 @@ pub struct ImportConfig {
5353
pub db_batch_size: Option<usize>, // How many items to process concurrently (default: 64)
5454
pub items_per_future: Option<usize>, // How many items each future handles (default: 1 = 64 concurrent futures)
5555
pub parse_thread_pool_size: Option<usize>, // Rayon thread pool size (default: num_cpus)
56-
56+
5757
/// Maximum number of entities to import (None = import all)
5858
/// Used for testing with partial data from large datasets
5959
pub max_entities: Option<usize>,
60+
61+
/// Disable embedding indexing for description field
62+
pub no_embedding: bool,
6063
}
6164

6265
impl ImportConfig {
@@ -73,9 +76,10 @@ impl ImportConfig {
7376
batch_size,
7477
memory_limit_mb,
7578
db_batch_size: None,
76-
items_per_future: None,
7779
parse_thread_pool_size: None,
7880
max_entities: None,
81+
items_per_future: None,
82+
no_embedding: false,
7983
}
8084
}
8185

@@ -96,12 +100,18 @@ impl ImportConfig {
96100
self.parse_thread_pool_size = Some(threads);
97101
self
98102
}
99-
103+
100104
/// Set maximum number of entities to import (for testing with partial data)
101105
pub fn with_max_entities(mut self, max: usize) -> Self {
102106
self.max_entities = Some(max);
103107
self
104108
}
109+
110+
/// Set whether to disable embedding indexing
111+
pub fn with_no_embedding(mut self, no_embedding: bool) -> Self {
112+
self.no_embedding = no_embedding;
113+
self
114+
}
105115
}
106116

107117
/// Statistics from import process
@@ -345,13 +355,14 @@ impl WikidataImporter {
345355
// Resolve concurrency settings from config (use defaults if not specified)
346356
let db_batch_size = config.db_batch_size.unwrap_or(64);
347357
let items_per_future = config.items_per_future.unwrap_or(64);
348-
358+
349359
// Cap parse threads at 32 even on large systems
350360
// CPU-bound parsing has diminishing returns beyond 32 threads due to:
351361
// - Cache contention
352362
// - Memory bandwidth saturation
353363
// - Scheduler overhead
354-
let num_threads = config.parse_thread_pool_size
364+
let num_threads = config
365+
.parse_thread_pool_size
355366
.unwrap_or_else(|| num_cpus::get().min(32));
356367

357368
// Create custom thread pool for parsing operations
@@ -365,7 +376,10 @@ impl WikidataImporter {
365376
.expect("Failed to build Rayon thread pool");
366377

367378
println!("📊 Concurrency Settings:");
368-
println!(" Parse threads: {} (capped at 32 for optimal CPU cache usage)", num_threads);
379+
println!(
380+
" Parse threads: {} (capped at 32 for optimal CPU cache usage)",
381+
num_threads
382+
);
369383
println!(" Thread stack size: 8MB (prevents stack overflow on complex entities)");
370384
println!(
371385
" DB batch size: {} (concurrent operations)",
@@ -424,7 +438,8 @@ impl WikidataImporter {
424438
});
425439

426440
// Create EntityMetadataCell schema (or reuse if exists)
427-
let entity_metadata_schema = create_entity_metadata_schema();
441+
// Pass the negate of no_embedding to enable/disable embedding index
442+
let entity_metadata_schema = create_entity_metadata_schema(!self.config.no_embedding);
428443
let entity_metadata_id = self
429444
.server
430445
.schema_container
@@ -576,8 +591,11 @@ impl WikidataImporter {
576591

577592
// Ensure embedding indexes are created for entity metadata description field
578593
// This is needed when schemas are reused from previous imports
579-
super::schema::ensure_wikidata_embedding_indexes(&self.server).await
580-
.map_err(|e| ImportError::Schema(format!("Failed to ensure embedding indexes: {}", e)))?;
594+
super::schema::ensure_wikidata_embedding_indexes(&self.server)
595+
.await
596+
.map_err(|e| {
597+
ImportError::Schema(format!("Failed to ensure embedding indexes: {}", e))
598+
})?;
581599

582600
Ok(())
583601
}
@@ -1441,11 +1459,14 @@ impl WikidataImporter {
14411459
line_count += 1;
14421460
bytes_read += line.len() as u64 + 1; // +1 for newline character
14431461
current_batch.push(line);
1444-
1462+
14451463
// Check if we've reached the max entities limit (for testing with partial data)
14461464
if let Some(max) = self.config.max_entities {
14471465
if line_count >= max {
1448-
println!("📊 Reached max_entities limit: {} lines (entity phase)", max);
1466+
println!(
1467+
"📊 Reached max_entities limit: {} lines (entity phase)",
1468+
max
1469+
);
14491470
break;
14501471
}
14511472
}
@@ -1912,11 +1933,14 @@ impl WikidataImporter {
19121933
line_count += 1;
19131934
bytes_read += line.len() as u64 + 1; // +1 for newline character
19141935
current_batch.push(line);
1915-
1936+
19161937
// Check if we've reached the max entities limit (for testing with partial data)
19171938
if let Some(max) = self.config.max_entities {
19181939
if line_count >= max {
1919-
println!("📊 Reached max_entities limit: {} lines (statement phase)", max);
1940+
println!(
1941+
"📊 Reached max_entities limit: {} lines (statement phase)",
1942+
max
1943+
);
19201944
break;
19211945
}
19221946
}

0 commit comments

Comments
 (0)