@@ -16,10 +16,9 @@ use core::borrow::BorrowMut;
1616use core:: cmp:: { max, min} ;
1717use core:: ops:: Range ;
1818use core:: pin:: Pin ;
19- use core:: sync:: atomic:: { AtomicU64 , Ordering } ;
2019use std:: collections:: HashMap ;
2120use std:: ffi:: OsString ;
22- use std:: sync:: { Arc , Weak } ;
21+ use std:: sync:: { Arc , LazyLock , Weak } ;
2322
2423use async_trait:: async_trait;
2524use futures:: { FutureExt , join} ;
@@ -35,9 +34,11 @@ use nativelink_util::store_trait::{
3534 RemoveItemCallback , Store , StoreDriver , StoreKey , StoreLike , StoreOptimizations ,
3635 UploadSizeInfo , slow_update_store_with_file,
3736} ;
37+ use opentelemetry:: { InstrumentationScope , global, metrics} ;
3838use parking_lot:: Mutex ;
3939use tokio:: sync:: OnceCell ;
4040use tracing:: { debug, trace, warn} ;
41+ use nativelink_util:: metrics:: FAST_SLOW_STORE_METRICS ;
4142
4243// TODO(palfrey) This store needs to be evaluated for more efficient memory usage,
4344// there are many copies happening internally.
@@ -57,8 +58,6 @@ pub struct FastSlowStore {
5758 slow_store : Store ,
5859 slow_direction : StoreDirection ,
5960 weak_self : Weak < Self > ,
60- #[ metric]
61- metrics : FastSlowStoreMetrics ,
6261 // De-duplicate requests for the fast store, only the first streams, others
6362 // are blocked. This may feel like it's causing a slow down of tasks, but
6463 // actually it's faster because we're not downloading the file multiple
@@ -123,7 +122,6 @@ impl FastSlowStore {
123122 slow_store,
124123 slow_direction : spec. slow_direction ,
125124 weak_self : weak_self. clone ( ) ,
126- metrics : FastSlowStoreMetrics :: default ( ) ,
127125 populating_digests : Mutex :: new ( HashMap :: new ( ) ) ,
128126 } )
129127 }
@@ -219,17 +217,15 @@ impl FastSlowStore {
219217 }
220218
221219 if !counted_hit {
222- self . metrics
223- . slow_store_hit_count
224- . fetch_add ( 1 , Ordering :: Acquire ) ;
220+ FAST_SLOW_STORE_METRICS . slow_store_hit_count . add ( 1 , & [ ] ) ;
225221 counted_hit = true ;
226222 }
227223
228224 let output_buf_len = u64:: try_from ( output_buf. len ( ) )
229225 . err_tip ( || "Could not output_buf.len() to u64" ) ?;
230- self . metrics
226+ FAST_SLOW_STORE_METRICS
231227 . slow_store_downloaded_bytes
232- . fetch_add ( output_buf_len, Ordering :: Acquire ) ;
228+ . add ( output_buf_len, & [ ] ) ;
233229
234230 let writer_fut = Self :: calculate_range (
235231 & ( bytes_received..bytes_received + output_buf_len) ,
@@ -591,15 +587,13 @@ impl StoreDriver for FastSlowStore {
591587 // TODO(palfrey) Investigate if we should maybe ignore errors here instead of
592588 // forwarding them up.
593589 if self . fast_store . has ( key. borrow ( ) ) . await ?. is_some ( ) {
594- self . metrics
595- . fast_store_hit_count
596- . fetch_add ( 1 , Ordering :: Acquire ) ;
590+ FAST_SLOW_STORE_METRICS . fast_store_hit_count . add ( 1 , & [ ] ) ;
597591 self . fast_store
598592 . get_part ( key, writer. borrow_mut ( ) , offset, length)
599593 . await ?;
600- self . metrics
594+ FAST_SLOW_STORE_METRICS
601595 . fast_store_downloaded_bytes
602- . fetch_add ( writer. get_bytes_written ( ) , Ordering :: Acquire ) ;
596+ . add ( writer. get_bytes_written ( ) , & [ ] ) ;
603597 return Ok ( ( ) ) ;
604598 }
605599
@@ -611,15 +605,13 @@ impl StoreDriver for FastSlowStore {
611605 || self . fast_direction == StoreDirection :: ReadOnly
612606 || self . fast_direction == StoreDirection :: Update
613607 {
614- self . metrics
615- . slow_store_hit_count
616- . fetch_add ( 1 , Ordering :: Acquire ) ;
608+ FAST_SLOW_STORE_METRICS . slow_store_hit_count . add ( 1 , & [ ] ) ;
617609 self . slow_store
618610 . get_part ( key, writer. borrow_mut ( ) , offset, length)
619611 . await ?;
620- self . metrics
612+ FAST_SLOW_STORE_METRICS
621613 . slow_store_downloaded_bytes
622- . fetch_add ( writer. get_bytes_written ( ) , Ordering :: Acquire ) ;
614+ . add ( writer. get_bytes_written ( ) , & [ ] ) ;
623615 return Ok ( ( ) ) ;
624616 }
625617
@@ -665,16 +657,4 @@ impl StoreDriver for FastSlowStore {
665657 }
666658}
667659
668- #[ derive( Debug , Default , MetricsComponent ) ]
669- struct FastSlowStoreMetrics {
670- #[ metric( help = "Hit count for the fast store" ) ]
671- fast_store_hit_count : AtomicU64 ,
672- #[ metric( help = "Downloaded bytes from the fast store" ) ]
673- fast_store_downloaded_bytes : AtomicU64 ,
674- #[ metric( help = "Hit count for the slow store" ) ]
675- slow_store_hit_count : AtomicU64 ,
676- #[ metric( help = "Downloaded bytes from the slow store" ) ]
677- slow_store_downloaded_bytes : AtomicU64 ,
678- }
679-
680660default_health_status_indicator ! ( FastSlowStore ) ;
0 commit comments