Skip to content

Commit b397c52

Browse files
committed
Fixup
1 parent 5520226 commit b397c52

5 files changed

Lines changed: 25 additions & 147 deletions

File tree

Cargo.lock

Lines changed: 10 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

nativelink-scheduler/src/simple_scheduler_state_manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1092,7 +1092,7 @@ where
10921092
warn!(state = ?awaited_action.state(), "Action already assigned");
10931093
return Err(make_err!(Code::Aborted, "Action already assigned"));
10941094
}
1095-
stage.clone()
1095+
(stage.clone(), false)
10961096
}
10971097
UpdateOperationType::UpdateWithError(err) => {
10981098
// Don't count a backpressure failure as an attempt for an action.

nativelink-store/src/redis_store.rs

Lines changed: 4 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -19,32 +19,15 @@ use core::pin::Pin;
1919
use core::str::FromStr;
2020
use core::time::Duration;
2121
use std::borrow::Cow;
22-
use std::collections::HashMap;
23-
use std::ops::Index;
2422
use std::sync::{Arc, Weak};
2523
use std::time::Instant;
2624

27-
use crate::cas_utils::is_zero_digest;
28-
use crate::redis_utils::ft_aggregate;
2925
use async_trait::async_trait;
3026
use bytes::Bytes;
3127
use const_format::formatcp;
32-
use fred::clients::SubscriberClient;
33-
use fred::interfaces::{ClientLike, KeysInterface, PubsubInterface};
34-
use fred::prelude::{Client, EventInterface, HashesInterface, RediSearchInterface};
35-
use fred::types::config::{
36-
Config as RedisConfig, ConnectionConfig, PerformanceConfig, ReconnectPolicy, UnresponsiveConfig,
37-
};
38-
use fred::types::redisearch::{
39-
AggregateOperation, FtAggregateOptions, FtCreateOptions, IndexKind, Load, ReducerFunc,
40-
SearchField, SearchReducer, SearchSchema, SearchSchemaKind, WithCursor,
41-
};
42-
use fred::types::scan::Scanner;
43-
use fred::types::scripts::Script;
44-
use fred::types::{Builder, Key as RedisKey, Map as RedisMap, SortOrder, Value as RedisValue};
45-
use futures::stream::FuturesUnordered;
46-
use futures::{FutureExt, Stream, StreamExt, TryStreamExt, future};
47-
use itertools::{Itertools, izip};
28+
use futures::stream::{self, FuturesUnordered};
29+
use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt, future};
30+
use itertools::izip;
4831
use nativelink_config::stores::{RedisMode, RedisSpec};
4932
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
5033
use nativelink_metric::MetricsComponent;
@@ -1454,85 +1437,7 @@ impl<C: Clone + ConnectionLike + Sync + Send + 'static, P: RedisPatternSubscribe
14541437
where
14551438
K: SchedulerIndexProvider + Send,
14561439
{
1457-
let index_values: Vec<_> = index.iter().map(|k| k.index_value()).collect();
1458-
let sanitized_fields: Vec<String> = index_values
1459-
.iter()
1460-
.map(|v| try_sanitize(v.as_ref()))
1461-
.map(|s| s.clone().unwrap_or_default().to_string())
1462-
.collect();
1463-
let index_name = format!(
1464-
"{}",
1465-
get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
1466-
);
1467-
1468-
let client = self.get_client().await?;
1469-
1470-
let query = if sanitized_fields.is_empty() {
1471-
"*".to_string()
1472-
} else {
1473-
format!("@{}:{{ {} }}", K::INDEX_NAME, sanitized_fields.join("|"))
1474-
};
1475-
1476-
let result: RedisValue = client
1477-
.client
1478-
.clone()
1479-
.ft_aggregate(
1480-
index_name,
1481-
query,
1482-
FtAggregateOptions {
1483-
pipeline: vec![AggregateOperation::GroupBy {
1484-
fields: vec![format!("@{}", K::INDEX_NAME).into()],
1485-
reducers: vec![SearchReducer {
1486-
func: ReducerFunc::Count,
1487-
args: vec![],
1488-
name: Some("cnt".into()),
1489-
}],
1490-
}],
1491-
..Default::default()
1492-
},
1493-
)
1494-
.await?;
1495-
1496-
if !result.is_array() {
1497-
return Err(Error::new(Code::Internal, "Expected array".to_string()));
1498-
}
1499-
1500-
let lookup: HashMap<&Cow<str>, usize> = index_values
1501-
.iter()
1502-
.enumerate()
1503-
.map(|(i, k)| (k, i))
1504-
.collect();
1505-
let mut counts = vec![0; index.len()];
1506-
1507-
let result = result.into_array();
1508-
if result.len() < 2 {
1509-
return Ok(counts);
1510-
}
1511-
1512-
result
1513-
.into_iter()
1514-
.skip(1)
1515-
.map(|map| map.into_map().unwrap())
1516-
.for_each(|map| {
1517-
let key = map
1518-
.get(&RedisKey::from_static_str(K::INDEX_NAME))
1519-
.err_tip(|| "Missing index field in RedisStore::count_by_index")
1520-
.unwrap();
1521-
1522-
let cnt_value = map
1523-
.get(&RedisKey::from_static_str("cnt"))
1524-
.err_tip(|| "Missing 'cnt' field in RedisStore::count_by_index")
1525-
.unwrap();
1526-
1527-
let count = cnt_value
1528-
.as_usize()
1529-
.err_tip(|| "Count value is not an integer in RedisStore::count_by_index")
1530-
.unwrap();
1531-
1532-
let val = lookup.get(&key.as_str().unwrap()).unwrap_or(&0);
1533-
counts[val.clone()] = count;
1534-
});
1535-
Ok(counts)
1440+
Err(make_err!(Code::Unimplemented, "Not implemented"))
15361441
}
15371442

15381443
async fn search_by_index_prefix<K>(

nativelink-worker/src/local_worker.rs

Lines changed: 7 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,18 @@ use std::collections::HashMap;
2121
use std::env;
2222
use std::process::Stdio;
2323
use std::sync::{Arc, Weak};
24+
use std::time::Instant;
2425
use futures::future::BoxFuture;
2526
use futures::stream::FuturesUnordered;
2627
use futures::{Future, FutureExt, StreamExt, TryFutureExt, select};
27-
use nativelink_config::cas_server::{EnvironmentSource, LocalWorkerConfig};
28+
use nativelink_config::cas_server::{EnvironmentSource, ExecutionCompletionBehaviour, LocalWorkerConfig};
2829
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
2930
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
3031
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::update_for_worker::Update;
3132
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::worker_api_client::WorkerApiClient;
3233
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{
33-
execute_result, ExecuteComplete, ExecuteResult, GoingAwayRequest, KeepAliveRequest,
34-
UpdateForWorker,
34+
ExecuteComplete, ExecuteResult, GoingAwayRequest, KeepAliveRequest, UpdateForWorker,
35+
execute_result,
3536
};
3637
use nativelink_store::fast_slow_store::FastSlowStore;
3738
use nativelink_util::action_messages::{ActionResult, ActionStage, OperationId};
@@ -43,47 +44,19 @@ use nativelink_util::store_trait::Store;
4344
use nativelink_util::{spawn, tls_utils};
4445
use opentelemetry::context::Context;
4546
use tokio::process;
46-
use tokio::sync::{broadcast, mpsc};
4747
use tokio::sync::broadcast::{Receiver, Sender};
48+
use tokio::sync::{broadcast, mpsc};
4849
use tokio::time::sleep;
4950
use tokio_stream::wrappers::UnboundedReceiverStream;
5051
use tonic::Streaming;
5152
use tracing::{Level, debug, error, event, info, info_span, instrument, trace, warn};
52-
53+
use nativelink_util::metrics::{WorkerMetricAttrs, LOCAL_WORKER_METRICS};
5354
use crate::running_actions_manager::{
5455
ExecutionConfiguration, Metrics as RunningActionManagerMetrics, RunningAction,
5556
RunningActionsManager, RunningActionsManagerArgs, RunningActionsManagerImpl,
5657
};
5758
use crate::worker_api_client_wrapper::{WorkerApiClientTrait, WorkerApiClientWrapper};
5859
use crate::worker_utils::make_connect_worker_request;
59-
use futures::future::BoxFuture;
60-
use futures::stream::FuturesUnordered;
61-
use futures::{Future, FutureExt, StreamExt, TryFutureExt, select};
62-
use nativelink_config::cas_server::{ExecutionCompletionBehaviour, LocalWorkerConfig};
63-
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
64-
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::update_for_worker::Update;
65-
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::worker_api_client::WorkerApiClient;
66-
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{
67-
ExecuteComplete, ExecuteResult, GoingAwayRequest, KeepAliveRequest, UpdateForWorker,
68-
execute_result,
69-
};
70-
use nativelink_store::fast_slow_store::FastSlowStore;
71-
use nativelink_util::action_messages::{ActionResult, ActionStage, OperationId};
72-
use nativelink_util::common::fs;
73-
use nativelink_util::digest_hasher::DigestHasherFunc;
74-
use nativelink_util::metrics::{LOCAL_WORKER_METRICS, WorkerMetricAttrs};
75-
use nativelink_util::shutdown_guard::ShutdownGuard;
76-
use nativelink_util::store_trait::Store;
77-
use nativelink_util::{spawn, tls_utils};
78-
use opentelemetry::context::Context;
79-
use opentelemetry::{InstrumentationScope, KeyValue, global, metrics};
80-
use tokio::process;
81-
use tokio::sync::broadcast::{Receiver, Sender};
82-
use tokio::sync::mpsc;
83-
use tokio::time::sleep;
84-
use tokio_stream::wrappers::UnboundedReceiverStream;
85-
use tonic::Streaming;
86-
use tracing::{Level, debug, error, event, info, info_span, instrument, warn};
8760

8861
/// Amount of time to wait if we have actions in transit before we try to
8962
/// consider an error to have occurred.
@@ -338,7 +311,7 @@ impl<'a, T: WorkerApiClientTrait + 'static, U: RunningActionsManager> LocalWorke
338311
operation_id: operation_id.clone(),
339312
};
340313
self.metrics.clone().wrap(move |metrics| async move {
341-
metrics.preconditions.wrap(preconditions_met(precondition_script_cfg, &extra_envs))
314+
metrics.wrap_preconditions(preconditions_met(precondition_script_cfg, &extra_envs))
342315
.and_then(|()| running_actions_manager.create_and_add_action(worker_id, start_execute))
343316
.map(move |r| {
344317
// Now that we either failed or registered our action, we can

nativelink-worker/src/running_actions_manager.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1347,7 +1347,7 @@ impl RunningActionImpl {
13471347
);
13481348
}
13491349

1350-
let stdout_digest_fut = self.metrics().upload_stdout.wrap(async {
1350+
let stdout_digest_fut = self.metrics().wrap_upload_stdout(async {
13511351
let start = std::time::Instant::now();
13521352
let data = execution_result.stdout;
13531353
let data_len = data.len();
@@ -1364,7 +1364,7 @@ impl RunningActionImpl {
13641364
);
13651365
Result::<DigestInfo, Error>::Ok(digest)
13661366
});
1367-
let stderr_digest_fut = self.metrics().upload_stderr.wrap(async {
1367+
let stderr_digest_fut = self.metrics().wrap_upload_stderr(async {
13681368
let start = std::time::Instant::now();
13691369
let data = execution_result.stderr;
13701370
let data_len = data.len();
@@ -1530,8 +1530,7 @@ impl RunningAction for RunningActionImpl {
15301530
);
15311531
let metrics = self.metrics().clone();
15321532
let upload_fut = metrics
1533-
.upload_results
1534-
.wrap(Self::inner_upload_results(self));
1533+
.wrap_upload_results(Self::inner_upload_results(self));
15351534

15361535
let stall_warn_fut = async {
15371536
let mut elapsed_secs = 0u64;

0 commit comments

Comments
 (0)