Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion libcaf_core/caf/actor_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,12 @@ class actor_system::impl {
}
// Make sure we have a clock.
if (!clock) {
clock = std::make_unique<actor_clock_impl>(*parent);
auto& factory = cfg.get_clock_factory();
if (factory) {
clock = factory(*parent);
} else {
clock = std::make_unique<actor_clock_impl>(*parent);
}
}
// Make sure we have a scheduler up and running.
if (!scheduler) {
Expand Down
11 changes: 6 additions & 5 deletions libcaf_core/caf/actor_system.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "caf/callback.hpp"
#include "caf/detail/core_export.hpp"
#include "caf/detail/format.hpp"
#include "caf/detail/init_fun_factory.hpp"
#include "caf/detail/set_thread_name.hpp"
#include "caf/detail/spawn_fwd.hpp"
#include "caf/detail/spawnable.hpp"
Expand All @@ -22,7 +23,6 @@
#include "caf/make_actor.hpp"
#include "caf/prohibit_top_level_spawn_marker.hpp"
#include "caf/spawn_options.hpp"
#include "caf/stateful_actor.hpp"
#include "caf/string_algorithms.hpp"
#include "caf/telemetry/actor_metrics.hpp"
#include "caf/term.hpp"
Expand All @@ -31,6 +31,7 @@

#include <cstddef>
#include <memory>
#include <span>
#include <string>
#include <thread>

Expand Down Expand Up @@ -347,10 +348,10 @@ class CAF_CORE_EXPORT actor_system {
requires(is_unbound(Os))
infer_handle_from_fun_t<F>
spawn_functor(std::true_type, actor_config& cfg, F& fun, Ts&&... xs) {
using base = infer_impl_from_fun_t<F>;
using state = detail::functor_state<base>;
using impl = stateful_actor<state, base>;
return spawn_class<impl, Os>(cfg, std::move(fun), std::forward<Ts>(xs)...);
using impl = infer_impl_from_fun_t<F>;
detail::init_fun_factory<impl, F> fac;
cfg.init_fun = fac(std::move(fun), std::forward<Ts>(xs)...);
return spawn_impl<impl, Os>(cfg);
}

/// Fallback no-op overload.
Expand Down
13 changes: 13 additions & 0 deletions libcaf_core/caf/actor_system_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "caf/actor_system_config.hpp"

#include "caf/actor_clock.hpp"
#include "caf/config.hpp"
#include "caf/config_option.hpp"
#include "caf/config_option_adder.hpp"
Expand Down Expand Up @@ -101,6 +102,8 @@ struct actor_system_config::fields {
exception_handler_type exception_handler
= scheduled_actor::default_exception_handler;
#endif // CAF_ENABLE_EXCEPTIONS
std::function<auto(actor_system&)->std::unique_ptr<actor_clock>>
clock_factory;
};

// -- constructors, destructors, and assignment operators ----------------------
Expand Down Expand Up @@ -619,6 +622,16 @@ void actor_system_config::print_content() const {
std::cout << std::endl;
}

void actor_system_config::set_clock_factory(
std::function<auto(actor_system&)->std::unique_ptr<actor_clock>> clock) {
fields_->clock_factory = std::move(clock);
}

auto actor_system_config::get_clock_factory() const
-> std::function<auto(actor_system&)->std::unique_ptr<actor_clock>>& {
return fields_->clock_factory;
}

// -- module factories ---------------------------------------------------------

void actor_system_config::add_module_factory(module_factory_fn ptr) {
Expand Down
6 changes: 6 additions & 0 deletions libcaf_core/caf/actor_system_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,12 @@ class CAF_CORE_EXPORT actor_system_config {
/// @private
void print_content() const;

void set_clock_factory(
std::function<auto(actor_system&)->std::unique_ptr<actor_clock>> clock);

auto get_clock_factory() const
-> std::function<auto(actor_system&)->std::unique_ptr<actor_clock>>&;

protected:
config_option_set custom_options_;

Expand Down
6 changes: 0 additions & 6 deletions libcaf_core/caf/actor_traits.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,4 @@ template <class T>
struct actor_traits
: default_actor_traits<T, std::is_base_of_v<abstract_actor, T>> {};

template <class T>
concept blocking_actor_type = actor_traits<T>::is_blocking;

template <class T>
concept non_blocking_actor_type = actor_traits<T>::is_non_blocking;

} // namespace caf
3 changes: 3 additions & 0 deletions libcaf_core/caf/attachable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class CAF_CORE_EXPORT attachable {
/// Identifies `default_attachable::observe_token`.
static constexpr size_t observer = 2;

/// Identifies `detail::monitor_token`.
static constexpr size_t monitor = 3;

template <class T>
token(const T& tk) : subtype(T::token_type), ptr(&tk) {
// nop
Expand Down
5 changes: 0 additions & 5 deletions libcaf_core/caf/behavior.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,6 @@ class CAF_CORE_EXPORT behavior {
// nop
}

// Convenience overload to allow "unsafe" initialization of any behavior_type.
explicit behavior(unsafe_behavior_init_t) {
// nop
}

/// Creates a behavior from `fun` without timeout.
behavior(const message_handler& mh);

Expand Down
3 changes: 2 additions & 1 deletion libcaf_core/caf/blocking_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ void blocking_actor::await_all_other_actors_done() {

void blocking_actor::act() {
auto lg = log::core::trace("");
// Default implementation does nothing.
if (initial_behavior_fac_)
initial_behavior_fac_(this);
}

void blocking_actor::fail_state(error err) {
Expand Down
53 changes: 53 additions & 0 deletions libcaf_core/caf/detail/monitor_attachable.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// This file is part of CAF, the C++ Actor Framework. See the file LICENSE in
// the main distribution directory for license terms and copyright or visit
// https://github.com/actor-framework/actor-framework/blob/main/LICENSE.

#pragma once

#include "caf/action.hpp"
#include "caf/actor.hpp"
#include "caf/actor_addr.hpp"
#include "caf/actor_cast.hpp"
#include "caf/attachable.hpp"
#include "caf/detail/monitor_action.hpp"
#include "caf/mailbox_element.hpp"

namespace caf::detail {

/// Token used to identify and remove a `monitor_attachable` via `detach()`.
struct monitor_token {
abstract_monitor_action* key;
static constexpr size_t token_type = attachable::token::monitor;
};

/// Attachable placed on the monitored actor. Unlike `functor_attachable` it
/// carries a matchable token so it can be removed via `abstract_actor::detach()`
/// when the monitoring actor exits or cancels the monitor.
class monitor_attachable : public attachable {
public:
static constexpr size_t token_type = attachable::token::monitor;

monitor_attachable(abstract_monitor_action_ptr on_down, actor_addr self)
: on_down_(std::move(on_down)), self_(std::move(self)) {}

void actor_exited(const error& reason, scheduler* sched) override {
if (on_down_->set_reason(error{reason})) {
if (auto shdl = actor_cast<actor>(self_))
shdl->enqueue(
make_mailbox_element(nullptr, make_message_id(), action{on_down_}),
sched);
}
}

bool matches(const token& what) override {
if (what.subtype != token_type)
return false;
return static_cast<const monitor_token*>(what.ptr)->key == on_down_.get();
}

private:
abstract_monitor_action_ptr on_down_;
actor_addr self_;
};

} // namespace caf::detail
8 changes: 7 additions & 1 deletion libcaf_core/caf/local_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@ void local_actor::demonitor(const node_id& node) {
}

void local_actor::do_monitor(abstract_actor* ptr, message_priority priority) {
if (ptr != nullptr)
if (ptr != nullptr) {
ptr->attach(
default_attachable::make_monitor(ptr->address(), address(), priority));
monitored_actors_.emplace_back(ptr->ctrl(), add_ref);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weak_actor_ptrs keep the control blocks that are pointed to alive, so we need to remove entries from this map non-lazily. I think that needs to be done in do_demonitor(), but also when a down_msg (+ node_down_msg?) is processed in scheduled actors. For completeness we might also want to do the same in blocking actors.

}
}

void local_actor::do_demonitor(const strong_actor_ptr& whom) {
Expand Down Expand Up @@ -137,6 +139,10 @@ void local_actor::on_cleanup([[maybe_unused]] const error& reason) {
if (auto* running_count = metrics_.running_count) {
running_count->dec();
}
for (auto& weak : monitored_actors_)
if (auto ptr = weak.lock())
do_demonitor(ptr);
monitored_actors_.clear();
on_exit();
CAF_LOG_TERMINATE_EVENT(this, reason);
}
Expand Down
5 changes: 5 additions & 0 deletions libcaf_core/caf/local_actor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <cstdint>
#include <type_traits>
#include <utility>
#include <vector>

namespace caf {

Expand Down Expand Up @@ -382,6 +383,10 @@ class CAF_CORE_EXPORT local_actor : public abstract_actor {
/// Stores the metrics for this actor.
telemetry::actor_metrics metrics_;

/// Tracks actors monitored via the non-callback monitor() API so they can
/// be demonitored automatically when this actor exits.
std::vector<weak_actor_ptr> monitored_actors_;

private:
virtual void do_unstash(mailbox_element_ptr ptr) = 0;
};
Expand Down
73 changes: 63 additions & 10 deletions libcaf_core/caf/scheduled_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "caf/config.hpp"
#include "caf/defaults.hpp"
#include "caf/detail/assert.hpp"
#include "caf/detail/monitor_attachable.hpp"
#include "caf/detail/critical.hpp"
#include "caf/detail/current_actor.hpp"
#include "caf/detail/default_invoke_result_visitor.hpp"
Expand All @@ -30,6 +31,51 @@

using namespace std::string_literals;

namespace {

/// Disposable returned from do_monitor. Calling dispose() both cancels the
/// monitor_action and removes its monitor_attachable from the monitored actor,
/// so the monitored actor's attachable list never accumulates stale entries.
class monitor_disposable_impl : public caf::ref_counted,
public caf::disposable::impl {
public:
monitor_disposable_impl(caf::detail::abstract_monitor_action_ptr on_down,
caf::weak_actor_ptr monitored)
: on_down_(std::move(on_down)), monitored_(std::move(monitored)) {}

void dispose() override {
on_down_->dispose();
if (auto ptr = monitored_.lock())
ptr->get()->detach(caf::detail::monitor_token{on_down_.get()});
}

bool disposed() const noexcept override {
return on_down_->disposed();
}

void ref_disposable() const noexcept override {
ref();
}

void deref_disposable() const noexcept override {
deref();
}

friend void intrusive_ptr_add_ref(const monitor_disposable_impl* p) noexcept {
p->ref();
}

friend void intrusive_ptr_release(const monitor_disposable_impl* p) noexcept {
p->deref();
}

private:
caf::detail::abstract_monitor_action_ptr on_down_;
caf::weak_actor_ptr monitored_;
};

} // namespace

namespace caf {

// -- related free functions ---------------------------------------------------
Expand Down Expand Up @@ -246,6 +292,11 @@ void scheduled_actor::on_cleanup(const error& reason) {
// Shutdown hosting thread when running detached.
if (private_thread_)
home_system().release_private_thread(private_thread_);
// Cancel any monitors this actor set up to avoid leaving stale attachables
// in the monitored actors' lists.
for (auto& d : active_monitors_)
d.dispose();
active_monitors_.clear();
// Clear state for open requests, flows and streams.
awaited_responses_.clear();
multiplexed_responses_.clear();
Expand Down Expand Up @@ -324,6 +375,10 @@ void scheduled_actor::resume(scheduler* sched, uint64_t event_id) {
// time's up
log::core::debug("max throughput reached: resume later");
intrusive_ptr_add_ref(ctrl());
if (private_thread_ != nullptr) {
private_thread_->resume(this);
return;
}
sched->delay(this, resumable::default_event_id);
}

Expand Down Expand Up @@ -1135,6 +1190,7 @@ void scheduled_actor::update_watched_disposables() {
log::core::debug("now watching {} disposables",
watched_disposables_.size());
}
disposable::erase_disposed(active_monitors_);
}

void scheduled_actor::register_flow_state(uint64_t local_id,
Expand Down Expand Up @@ -1215,16 +1271,13 @@ scheduled_actor::do_monitor(abstract_actor* ptr,
detail::abstract_monitor_action_ptr on_down) {
if (ptr == nullptr)
return {};
ptr->attach_functor([self = address(), on_down](error reason) {
// Failing to set the arg means the action was disposed.
if (on_down->set_reason(std::move(reason))) {
if (auto shdl = actor_cast<actor>(self))
shdl->enqueue(make_mailbox_element(nullptr, make_message_id(),
action{on_down}),
nullptr);
}
});
return on_down->as_disposable();
ptr->attach(attachable_ptr{
new detail::monitor_attachable(on_down, address())});
auto d = disposable{
make_counted<monitor_disposable_impl>(on_down,
actor_cast<weak_actor_ptr>(ptr))};
active_monitors_.push_back(d);
return d;
}

} // namespace caf
5 changes: 5 additions & 0 deletions libcaf_core/caf/scheduled_actor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,11 @@ class CAF_CORE_EXPORT scheduled_actor : public abstract_scheduled_actor,
/// terminating.
std::vector<disposable> watched_disposables_;

/// Stores disposables for active monitors set up by this actor. Disposed
/// automatically in on_cleanup() so stale entries never accumulate on the
/// monitored actors' attachable lists.
std::vector<disposable> active_monitors_;

/// Stores open streams that other actors may access. An actor is considered
/// alive as long as it has open streams.
std::unordered_map<uint64_t, stream_source_state> stream_sources_;
Expand Down
Loading
Loading