Skip to content

Commit 4b9d0cd

Browse files
committed
Existence cache should invalidate if get_part from underlying store fails.
1 parent 39b5d05 commit 4b9d0cd

20 files changed

Lines changed: 43 additions & 182 deletions

Cargo.lock

Lines changed: 10 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

nativelink-scheduler/src/api_worker_scheduler.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use core::ops::{Deref, DerefMut};
1616
use core::sync::atomic::{AtomicU64, Ordering};
1717
use core::time::Duration;
18-
use std::collections::{HashMap, HashSet};
18+
use std::collections::HashMap;
1919
use std::sync::Arc;
2020
use std::time::{Instant, UNIX_EPOCH};
2121

@@ -397,7 +397,7 @@ impl ApiWorkerSchedulerImpl {
397397
for (idx, platform_properties) in actions.iter().enumerate() {
398398
let candidates = self
399399
.capability_index
400-
.find_matching_workers(platform_properties);
400+
.find_matching_workers(platform_properties, full_worker_logging);
401401
if candidates.is_empty() {
402402
continue;
403403
}

nativelink-scheduler/src/awaited_action_db/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use core::cmp;
1616
use core::ops::Bound;
1717
use core::time::Duration;
1818
use std::collections::HashMap;
19-
use std::iter::Map;
2019
use std::sync::Arc;
2120

2221
pub use awaited_action::{AwaitedAction, AwaitedActionSortKey};

nativelink-scheduler/src/memory_awaited_action_db.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@ use core::ops::{Bound, RangeBounds};
1616
use core::time::Duration;
1717
use std::collections::hash_map::Entry;
1818
use std::collections::{BTreeMap, BTreeSet, HashMap};
19-
use std::iter::Map;
2019
use std::sync::Arc;
2120

22-
use async_lock::{Mutex, RwLock};
21+
use async_lock::RwLock;
2322
use futures::{FutureExt, Stream};
2423
use nativelink_config::stores::EvictionPolicy;
2524
use nativelink_error::{Code, Error, ResultExt, error_if, make_err};

nativelink-scheduler/src/simple_scheduler.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,12 @@ use tokio::time::Duration;
4343
use tracing::{debug, error, info, info_span, warn};
4444

4545
use crate::api_worker_scheduler::ApiWorkerScheduler;
46-
use crate::awaited_action_db::{AwaitedAction, AwaitedActionDb, CLIENT_KEEPALIVE_DURATION};
46+
use crate::awaited_action_db::{AwaitedActionDb, CLIENT_KEEPALIVE_DURATION};
4747
use crate::platform_property_manager::PlatformPropertyManager;
4848
use crate::simple_scheduler_state_manager::{SchedulerStateManager, SimpleSchedulerStateManager};
4949
use crate::worker::{ActionInfoWithProps, ActionsState, Worker, WorkerState, WorkerTimestamp};
5050
use crate::worker_registry::WorkerRegistry;
5151
use crate::worker_scheduler::WorkerScheduler;
52-
use nativelink_util::metrics::StoreType::Metrics;
5352
use serde::Serialize;
5453
use nativelink_util::metrics::EXECUTION_METRICS;
5554

nativelink-scheduler/src/simple_scheduler_state_manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1092,7 +1092,7 @@ where
10921092
warn!(state = ?awaited_action.state(), "Action already assigned");
10931093
return Err(make_err!(Code::Aborted, "Action already assigned"));
10941094
}
1095-
stage.clone()
1095+
(stage.clone(), false)
10961096
}
10971097
UpdateOperationType::UpdateWithError(err) => {
10981098
// Don't count a backpressure failure as an attempt for an action.

nativelink-scheduler/src/store_awaited_action_db.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use nativelink_util::store_trait::{
3737
};
3838
use nativelink_util::task::JoinHandleDropGuard;
3939
use tokio::sync::Notify;
40-
use tracing::{error, info, warn};
40+
use tracing::{error, warn};
4141

4242
use crate::awaited_action_db::{
4343
AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, CLIENT_KEEPALIVE_DURATION,

nativelink-scheduler/src/worker.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use nativelink_util::metrics_utils::{AsyncCounterWrapper, CounterWithTime, FuncC
2727
use nativelink_util::platform_properties::{PlatformProperties, PlatformPropertyValue};
2828
use serde::Serialize;
2929
use tokio::sync::mpsc::UnboundedSender;
30-
use crate::awaited_action_db::AwaitedAction;
3130

3231
pub type WorkerTimestamp = u64;
3332

@@ -61,7 +60,7 @@ pub struct PendingActionInfoData {
6160
pub action_info: ActionInfoWithProps,
6261
}
6362

64-
#[derive(Serialize)]
63+
#[derive(Serialize, Debug)]
6564
pub struct WorkerState {
6665
pub id: WorkerId,
6766
pub platform_properties: PlatformProperties,
@@ -71,7 +70,7 @@ pub struct WorkerState {
7170
pub is_draining: bool,
7271
}
7372

74-
#[derive(Serialize)]
73+
#[derive(Serialize, Debug)]
7574
pub struct ActionsState {
7675
pub executing: usize,
7776
pub queued: usize,

nativelink-store/src/existence_cache_store.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,8 @@ impl<I: InstantWrapper> StoreDriver for ExistenceCacheStore<I> {
293293
.existence_cache
294294
.insert(digest, ExistenceItem(digest.size_bytes()))
295295
.await;
296+
} else {
297+
let _ = self.existence_cache.remove(&digest).await;
296298
}
297299
result
298300
}

nativelink-store/src/fast_slow_store.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use core::ops::Range;
1818
use core::pin::Pin;
1919
use std::collections::HashMap;
2020
use std::ffi::OsString;
21-
use std::sync::{Arc, LazyLock, Weak};
21+
use std::sync::{Arc, Weak};
2222

2323
use async_trait::async_trait;
2424
use futures::{FutureExt, join};
@@ -34,7 +34,6 @@ use nativelink_util::store_trait::{
3434
RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, StoreOptimizations,
3535
UploadSizeInfo, slow_update_store_with_file,
3636
};
37-
use opentelemetry::{InstrumentationScope, global, metrics};
3837
use parking_lot::Mutex;
3938
use tokio::sync::OnceCell;
4039
use tracing::{debug, trace, warn};

0 commit comments

Comments
 (0)