Skip to content

Commit a4ab0d6

Browse files
authored
fix: memory leak when reporter is not set (#120)
1 parent 929879e commit a4ab0d6

3 files changed

Lines changed: 95 additions & 55 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## Unreleased
44

5+
- Fix memory leak when reporter is not set.
6+
57
## v0.7.12
68

79
- Propagate trace context no matter the reporter is set or not.

fastrace/src/collector/global_collector.rs

Lines changed: 91 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@ use std::borrow::Cow;
44
use std::cell::UnsafeCell;
55
use std::collections::HashMap;
66
use std::sync::Arc;
7+
use std::sync::atomic::AtomicBool;
8+
use std::sync::atomic::AtomicU64;
79
use std::sync::atomic::AtomicUsize;
810
use std::sync::atomic::Ordering;
11+
use std::time::Duration;
912

1013
use fastant::Anchor;
1114
use fastant::Instant;
@@ -34,12 +37,15 @@ use crate::util::spsc::{self};
3437
static NEXT_COLLECT_ID: AtomicUsize = AtomicUsize::new(0);
3538
static GLOBAL_COLLECTOR: Mutex<Option<GlobalCollector>> = Mutex::new(None);
3639
static SPSC_RXS: Mutex<Vec<Receiver<CollectCommand>>> = Mutex::new(Vec::new());
40+
static REPORT_INTERVAL: AtomicU64 = AtomicU64::new(0);
41+
static REPORTER_READY: AtomicBool = AtomicBool::new(false);
3742

3843
pub const NOT_SAMPLED_COLLECT_ID: usize = usize::MAX;
44+
const CHANNEL_SIZE: usize = 10240;
3945

4046
thread_local! {
4147
static COMMAND_SENDER: UnsafeCell<Sender<CollectCommand>> = {
42-
let (tx, rx) = spsc::bounded(10240);
48+
let (tx, rx) = spsc::bounded(CHANNEL_SIZE);
4349
register_receiver(rx);
4450
UnsafeCell::new(tx)
4551
};
@@ -50,17 +56,29 @@ fn register_receiver(rx: Receiver<CollectCommand>) {
5056
}
5157

5258
fn send_command(cmd: CollectCommand) {
59+
if !reporter_ready() {
60+
return;
61+
}
62+
5363
COMMAND_SENDER
5464
.try_with(|sender| unsafe { (*sender.get()).send(cmd).ok() })
5565
.ok();
5666
}
5767

5868
fn force_send_command(cmd: CollectCommand) {
69+
if !reporter_ready() {
70+
return;
71+
}
72+
5973
COMMAND_SENDER
6074
.try_with(|sender| unsafe { (*sender.get()).force_send(cmd) })
6175
.ok();
6276
}
6377

78+
fn reporter_ready() -> bool {
79+
REPORTER_READY.load(Ordering::Relaxed)
80+
}
81+
6482
/// Sets the reporter and its configuration for the current application.
6583
///
6684
/// # Examples
@@ -186,6 +204,15 @@ enum SpanCollection {
186204
},
187205
}
188206

207+
impl SpanCollection {
208+
fn trace_id(&self) -> TraceId {
209+
match self {
210+
SpanCollection::Owned { trace_id, .. } => *trace_id,
211+
SpanCollection::Shared { trace_id, .. } => *trace_id,
212+
}
213+
}
214+
}
215+
189216
#[derive(Default)]
190217
struct ActiveCollector {
191218
span_collections: Vec<SpanCollection>,
@@ -209,37 +236,45 @@ pub(crate) struct GlobalCollector {
209236

210237
impl GlobalCollector {
211238
fn start(reporter: impl Reporter, config: Config) {
212-
let global_collector = GlobalCollector {
213-
config,
214-
reporter: Some(Box::new(reporter)),
215-
216-
active_collectors: HashMap::new(),
217-
218-
start_collects: vec![],
219-
drop_collects: vec![],
220-
commit_collects: vec![],
221-
submit_spans: vec![],
222-
stale_spans: vec![],
223-
};
224-
225-
*GLOBAL_COLLECTOR.lock() = Some(global_collector);
239+
REPORT_INTERVAL.store(config.report_interval.as_nanos() as u64, Ordering::Relaxed);
240+
REPORTER_READY.store(true, Ordering::Relaxed);
241+
242+
let mut global_collector = GLOBAL_COLLECTOR.lock();
243+
244+
if let Some(collector) = global_collector.as_mut() {
245+
collector.reporter = Some(Box::new(reporter));
246+
collector.config = config;
247+
} else {
248+
*global_collector = Some(GlobalCollector {
249+
config,
250+
reporter: Some(Box::new(reporter)),
251+
252+
active_collectors: HashMap::new(),
253+
254+
start_collects: vec![],
255+
drop_collects: vec![],
256+
commit_collects: vec![],
257+
submit_spans: vec![],
258+
stale_spans: vec![],
259+
});
226260

227-
#[cfg(not(target_family = "wasm"))]
228-
{
229-
std::thread::Builder::new()
230-
.name("fastrace-global-collector".to_string())
231-
.spawn(move || {
232-
loop {
233-
let begin_instant = Instant::now();
234-
GLOBAL_COLLECTOR.lock().as_mut().unwrap().handle_commands();
235-
std::thread::sleep(
236-
config
237-
.report_interval
238-
.saturating_sub(begin_instant.elapsed()),
239-
);
240-
}
241-
})
242-
.unwrap();
261+
#[cfg(not(target_family = "wasm"))]
262+
{
263+
std::thread::Builder::new()
264+
.name("fastrace-global-collector".to_string())
265+
.spawn(move || {
266+
loop {
267+
let begin_instant = Instant::now();
268+
GLOBAL_COLLECTOR.lock().as_mut().unwrap().handle_commands();
269+
let report_interval =
270+
Duration::from_nanos(REPORT_INTERVAL.load(Ordering::Relaxed));
271+
std::thread::sleep(
272+
report_interval.saturating_sub(begin_instant.elapsed()),
273+
);
274+
}
275+
})
276+
.unwrap();
277+
}
243278
}
244279
}
245280

@@ -349,7 +384,7 @@ impl GlobalCollector {
349384
for CommitCollect { collect_id } in commit_collects.drain(..) {
350385
if let Some(mut active_collector) = self.active_collectors.remove(&collect_id) {
351386
postprocess_span_collection(
352-
active_collector.span_collections,
387+
&active_collector.span_collections,
353388
&anchor,
354389
&mut committed_records,
355390
&mut active_collector.danglings,
@@ -360,17 +395,20 @@ impl GlobalCollector {
360395
if !self.config.tail_sampled {
361396
for active_collector in self.active_collectors.values_mut() {
362397
postprocess_span_collection(
363-
active_collector.span_collections.drain(..),
398+
&active_collector.span_collections,
364399
&anchor,
365400
&mut committed_records,
366401
&mut active_collector.danglings,
367402
);
403+
active_collector.span_collections.clear();
368404
}
369405
}
370406

371-
for spans in stale_spans.drain(..) {
407+
stale_spans.sort_by_key(|spans| spans.trace_id());
408+
409+
for spans in stale_spans.chunk_by(|a, b| a.trace_id() == b.trace_id()) {
372410
postprocess_span_collection(
373-
[spans],
411+
spans,
374412
&anchor,
375413
&mut committed_records,
376414
&mut HashMap::new(),
@@ -404,8 +442,8 @@ enum DanglingItem {
404442
Properties(Vec<(Cow<'static, str>, Cow<'static, str>)>),
405443
}
406444

407-
fn postprocess_span_collection(
408-
span_collections: impl IntoIterator<Item = SpanCollection>,
445+
fn postprocess_span_collection<'a>(
446+
span_collections: impl IntoIterator<Item = &'a SpanCollection>,
409447
anchor: &Anchor,
410448
committed_records: &mut Vec<SpanRecord>,
411449
danglings: &mut HashMap<SpanId, Vec<DanglingItem>>,
@@ -420,25 +458,25 @@ fn postprocess_span_collection(
420458
parent_id,
421459
} => match spans {
422460
SpanSet::Span(raw_span) => amend_span(
423-
&raw_span,
424-
trace_id,
425-
parent_id,
461+
raw_span,
462+
*trace_id,
463+
*parent_id,
426464
committed_records,
427465
danglings,
428466
anchor,
429467
),
430468
SpanSet::LocalSpansInner(local_spans) => amend_local_span(
431-
&local_spans,
432-
trace_id,
433-
parent_id,
469+
local_spans,
470+
*trace_id,
471+
*parent_id,
434472
committed_records,
435473
danglings,
436474
anchor,
437475
),
438476
SpanSet::SharedLocalSpans(local_spans) => amend_local_span(
439-
&local_spans,
440-
trace_id,
441-
parent_id,
477+
local_spans,
478+
*trace_id,
479+
*parent_id,
442480
committed_records,
443481
danglings,
444482
anchor,
@@ -448,27 +486,27 @@ fn postprocess_span_collection(
448486
spans,
449487
trace_id,
450488
parent_id,
451-
} => match &*spans {
489+
} => match &**spans {
452490
SpanSet::Span(raw_span) => amend_span(
453491
raw_span,
454-
trace_id,
455-
parent_id,
492+
*trace_id,
493+
*parent_id,
456494
committed_records,
457495
danglings,
458496
anchor,
459497
),
460498
SpanSet::LocalSpansInner(local_spans) => amend_local_span(
461499
local_spans,
462-
trace_id,
463-
parent_id,
500+
*trace_id,
501+
*parent_id,
464502
committed_records,
465503
danglings,
466504
anchor,
467505
),
468506
SpanSet::SharedLocalSpans(local_spans) => amend_local_span(
469507
local_spans,
470-
trace_id,
471-
parent_id,
508+
*trace_id,
509+
*parent_id,
472510
committed_records,
473511
danglings,
474512
anchor,

fastrace/src/collector/id.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ thread_local! {
1313
}
1414

1515
/// An identifier for a trace, which groups a set of related spans together.
16-
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Default)]
16+
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
1717
pub struct TraceId(pub u128);
1818

1919
impl TraceId {
@@ -61,7 +61,7 @@ impl<'de> serde::Deserialize<'de> for TraceId {
6161
}
6262

6363
/// An identifier for a span within a trace.
64-
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Default)]
64+
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
6565
pub struct SpanId(pub u64);
6666

6767
impl SpanId {

0 commit comments

Comments
 (0)