Skip to content

Commit f50268b

Browse files
committed
Fixup
# Conflicts: # nativelink-store/src/redis_store.rs
1 parent 735a2e1 commit f50268b

5 files changed

Lines changed: 22 additions & 129 deletions

File tree

Cargo.lock

Lines changed: 10 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

nativelink-scheduler/src/simple_scheduler_state_manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1092,7 +1092,7 @@ where
10921092
warn!(state = ?awaited_action.state(), "Action already assigned");
10931093
return Err(make_err!(Code::Aborted, "Action already assigned"));
10941094
}
1095-
stage.clone()
1095+
(stage.clone(), false)
10961096
}
10971097
UpdateOperationType::UpdateWithError(err) => {
10981098
// Don't count a backpressure failure as an attempt for an action.

nativelink-store/src/redis_store.rs

Lines changed: 1 addition & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ use std::ops::Index;
2626
use std::sync::{Arc, Weak};
2727
use std::time::Instant;
2828

29-
use crate::cas_utils::is_zero_digest;
30-
use crate::redis_utils::ft_aggregate;
3129
use async_trait::async_trait;
3230
use bytes::Bytes;
3331
use const_format::formatcp;
@@ -1652,85 +1650,7 @@ where
16521650
where
16531651
K: SchedulerIndexProvider + Send,
16541652
{
1655-
let index_values: Vec<_> = index.iter().map(|k| k.index_value()).collect();
1656-
let sanitized_fields: Vec<String> = index_values
1657-
.iter()
1658-
.map(|v| try_sanitize(v.as_ref()))
1659-
.map(|s| s.clone().unwrap_or_default().to_string())
1660-
.collect();
1661-
let index_name = format!(
1662-
"{}",
1663-
get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
1664-
);
1665-
1666-
let client = self.get_client().await?;
1667-
1668-
let query = if sanitized_fields.is_empty() {
1669-
"*".to_string()
1670-
} else {
1671-
format!("@{}:{{ {} }}", K::INDEX_NAME, sanitized_fields.join("|"))
1672-
};
1673-
1674-
let result: RedisValue = client
1675-
.client
1676-
.clone()
1677-
.ft_aggregate(
1678-
index_name,
1679-
query,
1680-
FtAggregateOptions {
1681-
pipeline: vec![AggregateOperation::GroupBy {
1682-
fields: vec![format!("@{}", K::INDEX_NAME).into()],
1683-
reducers: vec![SearchReducer {
1684-
func: ReducerFunc::Count,
1685-
args: vec![],
1686-
name: Some("cnt".into()),
1687-
}],
1688-
}],
1689-
..Default::default()
1690-
},
1691-
)
1692-
.await?;
1693-
1694-
if !result.is_array() {
1695-
return Err(Error::new(Code::Internal, "Expected array".to_string()));
1696-
}
1697-
1698-
let lookup: HashMap<&Cow<str>, usize> = index_values
1699-
.iter()
1700-
.enumerate()
1701-
.map(|(i, k)| (k, i))
1702-
.collect();
1703-
let mut counts = vec![0; index.len()];
1704-
1705-
let result = result.into_array();
1706-
if result.len() < 2 {
1707-
return Ok(counts);
1708-
}
1709-
1710-
result
1711-
.into_iter()
1712-
.skip(1)
1713-
.map(|map| map.into_map().unwrap())
1714-
.for_each(|map| {
1715-
let key = map
1716-
.get(&RedisKey::from_static_str(K::INDEX_NAME))
1717-
.err_tip(|| "Missing index field in RedisStore::count_by_index")
1718-
.unwrap();
1719-
1720-
let cnt_value = map
1721-
.get(&RedisKey::from_static_str("cnt"))
1722-
.err_tip(|| "Missing 'cnt' field in RedisStore::count_by_index")
1723-
.unwrap();
1724-
1725-
let count = cnt_value
1726-
.as_usize()
1727-
.err_tip(|| "Count value is not an integer in RedisStore::count_by_index")
1728-
.unwrap();
1729-
1730-
let val = lookup.get(&key.as_str().unwrap()).unwrap_or(&0);
1731-
counts[val.clone()] = count;
1732-
});
1733-
Ok(counts)
1653+
Err(make_err!(Code::Unimplemented, "Not implemented"))
17341654
}
17351655

17361656
async fn search_by_index_prefix<K>(

nativelink-worker/src/local_worker.rs

Lines changed: 7 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,18 @@ use std::collections::HashMap;
2222
use std::env;
2323
use std::process::Stdio;
2424
use std::sync::{Arc, Weak};
25+
use std::time::Instant;
2526
use futures::future::BoxFuture;
2627
use futures::stream::FuturesUnordered;
2728
use futures::{Future, FutureExt, StreamExt, TryFutureExt, select};
28-
use nativelink_config::cas_server::{EnvironmentSource, LocalWorkerConfig};
29+
use nativelink_config::cas_server::{EnvironmentSource, ExecutionCompletionBehaviour, LocalWorkerConfig};
2930
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
3031
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
3132
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::update_for_worker::Update;
3233
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::worker_api_client::WorkerApiClient;
3334
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{
34-
execute_result, ExecuteComplete, ExecuteResult, GoingAwayRequest, KeepAliveRequest,
35-
UpdateForWorker,
35+
ExecuteComplete, ExecuteResult, GoingAwayRequest, KeepAliveRequest, UpdateForWorker,
36+
execute_result,
3637
};
3738
use nativelink_store::fast_slow_store::FastSlowStore;
3839
use nativelink_util::action_messages::{ActionResult, ActionStage, OperationId};
@@ -44,47 +45,19 @@ use nativelink_util::store_trait::Store;
4445
use nativelink_util::{spawn, tls_utils};
4546
use opentelemetry::context::Context;
4647
use tokio::process;
47-
use tokio::sync::{broadcast, mpsc};
4848
use tokio::sync::broadcast::{Receiver, Sender};
49+
use tokio::sync::{broadcast, mpsc};
4950
use tokio::time::sleep;
5051
use tokio_stream::wrappers::UnboundedReceiverStream;
5152
use tonic::Streaming;
5253
use tracing::{Level, debug, error, event, info, info_span, instrument, trace, warn};
53-
54+
use nativelink_util::metrics::{WorkerMetricAttrs, LOCAL_WORKER_METRICS};
5455
use crate::running_actions_manager::{
5556
ExecutionConfiguration, Metrics as RunningActionManagerMetrics, RunningAction,
5657
RunningActionsManager, RunningActionsManagerArgs, RunningActionsManagerImpl,
5758
};
5859
use crate::worker_api_client_wrapper::{WorkerApiClientTrait, WorkerApiClientWrapper};
5960
use crate::worker_utils::make_connect_worker_request;
60-
use futures::future::BoxFuture;
61-
use futures::stream::FuturesUnordered;
62-
use futures::{Future, FutureExt, StreamExt, TryFutureExt, select};
63-
use nativelink_config::cas_server::{ExecutionCompletionBehaviour, LocalWorkerConfig};
64-
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
65-
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::update_for_worker::Update;
66-
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::worker_api_client::WorkerApiClient;
67-
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{
68-
ExecuteComplete, ExecuteResult, GoingAwayRequest, KeepAliveRequest, UpdateForWorker,
69-
execute_result,
70-
};
71-
use nativelink_store::fast_slow_store::FastSlowStore;
72-
use nativelink_util::action_messages::{ActionResult, ActionStage, OperationId};
73-
use nativelink_util::common::fs;
74-
use nativelink_util::digest_hasher::DigestHasherFunc;
75-
use nativelink_util::metrics::{LOCAL_WORKER_METRICS, WorkerMetricAttrs};
76-
use nativelink_util::shutdown_guard::ShutdownGuard;
77-
use nativelink_util::store_trait::Store;
78-
use nativelink_util::{spawn, tls_utils};
79-
use opentelemetry::context::Context;
80-
use opentelemetry::{InstrumentationScope, KeyValue, global, metrics};
81-
use tokio::process;
82-
use tokio::sync::broadcast::{Receiver, Sender};
83-
use tokio::sync::mpsc;
84-
use tokio::time::sleep;
85-
use tokio_stream::wrappers::UnboundedReceiverStream;
86-
use tonic::Streaming;
87-
use tracing::{Level, debug, error, event, info, info_span, instrument, warn};
8861

8962
/// Amount of time to wait if we have actions in transit before we try to
9063
/// consider an error to have occurred.
@@ -339,7 +312,7 @@ impl<'a, T: WorkerApiClientTrait + 'static, U: RunningActionsManager> LocalWorke
339312
operation_id: operation_id.clone(),
340313
};
341314
self.metrics.clone().wrap(move |metrics| async move {
342-
metrics.preconditions.wrap(preconditions_met(precondition_script_cfg, &extra_envs))
315+
metrics.wrap_preconditions(preconditions_met(precondition_script_cfg, &extra_envs))
343316
.and_then(|()| running_actions_manager.create_and_add_action(worker_id, start_execute))
344317
.map(move |r| {
345318
// Now that we either failed or registered our action, we can

nativelink-worker/src/running_actions_manager.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1347,7 +1347,7 @@ impl RunningActionImpl {
13471347
);
13481348
}
13491349

1350-
let stdout_digest_fut = self.metrics().upload_stdout.wrap(async {
1350+
let stdout_digest_fut = self.metrics().wrap_upload_stdout(async {
13511351
let start = std::time::Instant::now();
13521352
let data = execution_result.stdout;
13531353
let data_len = data.len();
@@ -1364,7 +1364,7 @@ impl RunningActionImpl {
13641364
);
13651365
Result::<DigestInfo, Error>::Ok(digest)
13661366
});
1367-
let stderr_digest_fut = self.metrics().upload_stderr.wrap(async {
1367+
let stderr_digest_fut = self.metrics().wrap_upload_stderr(async {
13681368
let start = std::time::Instant::now();
13691369
let data = execution_result.stderr;
13701370
let data_len = data.len();
@@ -1530,8 +1530,7 @@ impl RunningAction for RunningActionImpl {
15301530
);
15311531
let metrics = self.metrics().clone();
15321532
let upload_fut = metrics
1533-
.upload_results
1534-
.wrap(Self::inner_upload_results(self));
1533+
.wrap_upload_results(Self::inner_upload_results(self));
15351534

15361535
let stall_warn_fut = async {
15371536
let mut elapsed_secs = 0u64;

0 commit comments

Comments
 (0)