-
Notifications
You must be signed in to change notification settings - Fork 5.3k
Expand file tree
/
Copy paththread_aware_lb_impl.h
More file actions
199 lines (172 loc) · 8.68 KB
/
thread_aware_lb_impl.h
File metadata and controls
199 lines (172 loc) · 8.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
#pragma once
#include <bitset>
#include "envoy/common/callback.h"
#include "envoy/config/cluster/v3/cluster.pb.h"
#include "source/common/common/logger.h"
#include "source/common/config/metadata.h"
#include "source/common/config/well_known_names.h"
#include "source/common/http/hash_policy.h"
#include "source/extensions/load_balancing_policies/common/load_balancer_impl.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
namespace Envoy {
namespace Upstream {
using NormalizedHostWeightVector = std::vector<std::pair<HostConstSharedPtr, double>>;
using NormalizedHostWeightMap = std::map<HostConstSharedPtr, double>;
using HashPolicyProto = envoy::config::route::v3::RouteAction::HashPolicy;
using HashPolicySharedPtr = std::shared_ptr<Http::HashPolicy>;
class ThreadAwareLoadBalancerBase : public LoadBalancerBase, public ThreadAwareLoadBalancer {
public:
/**
* Base class for a hashing load balancer implemented for use in a thread aware load balancer.
* TODO(mattklein123): Currently only RingHash and Maglev use the thread aware load balancer.
* The hash is pre-computed prior to getting to the real load balancer for
* use in priority selection. In the future we likely we will want to pass
* through the full load balancer context in case a future implementation
* wants to use it.
*/
class HashingLoadBalancer {
public:
virtual ~HashingLoadBalancer() = default;
virtual HostSelectionResponse chooseHost(uint64_t hash, uint32_t attempt) const PURE;
const absl::string_view hashKey(HostConstSharedPtr host, bool use_hostname) const {
const Protobuf::Value& val = Config::Metadata::metadataValue(
host->metadata().get(), Config::MetadataFilters::get().ENVOY_LB,
Config::MetadataEnvoyLbKeys::get().HASH_KEY);
if (val.kind_case() != val.kStringValue && val.kind_case() != val.KIND_NOT_SET) {
FINE_GRAIN_LOG(debug, "", "hash_key must be string type, got: {}",
static_cast<int>(val.kind_case()));
}
absl::string_view hash_key = val.string_value();
if (hash_key.empty()) {
hash_key = use_hostname ? host->hostname() : host->address()->asString();
}
return hash_key;
}
};
using HashingLoadBalancerSharedPtr = std::shared_ptr<HashingLoadBalancer>;
/**
* Class for consistent hashing load balancer (CH-LB) with bounded loads.
* It is common to both RingHash and Maglev load balancers, because the logic of selecting the
* next host when one is overloaded is independent of the CH-LB type.
*/
class BoundedLoadHashingLoadBalancer : public HashingLoadBalancer {
public:
BoundedLoadHashingLoadBalancer(HashingLoadBalancerSharedPtr hashing_lb_ptr,
NormalizedHostWeightVector normalized_host_weights,
uint32_t hash_balance_factor)
: normalized_host_weights_map_(initNormalizedHostWeightMap(normalized_host_weights)),
hashing_lb_ptr_(std::move(hashing_lb_ptr)),
normalized_host_weights_(std::move(normalized_host_weights)),
hash_balance_factor_(hash_balance_factor) {
ASSERT(hashing_lb_ptr_ != nullptr);
ASSERT(hash_balance_factor > 0);
}
HostSelectionResponse chooseHost(uint64_t hash, uint32_t attempt) const override;
protected:
virtual double hostOverloadFactor(const Host& host, double weight) const;
const NormalizedHostWeightMap normalized_host_weights_map_;
private:
const NormalizedHostWeightMap
initNormalizedHostWeightMap(const NormalizedHostWeightVector& normalized_host_weights) {
NormalizedHostWeightMap normalized_host_weights_map;
for (auto const& item : normalized_host_weights) {
normalized_host_weights_map[item.first] = item.second;
}
return normalized_host_weights_map;
}
const HashingLoadBalancerSharedPtr hashing_lb_ptr_;
const NormalizedHostWeightVector normalized_host_weights_;
const uint32_t hash_balance_factor_;
};
// Upstream::ThreadAwareLoadBalancer
LoadBalancerFactorySharedPtr factory() override { return factory_; }
absl::Status initialize() override;
// Upstream::LoadBalancer
HostSelectionResponse chooseHost(LoadBalancerContext*) override { return {nullptr}; }
// Preconnect not implemented for hash based load balancing
HostConstSharedPtr peekAnotherHost(LoadBalancerContext*) override { return nullptr; }
// Pool selection not implemented.
absl::optional<Upstream::SelectedPoolAndConnection>
selectExistingConnection(Upstream::LoadBalancerContext* /*context*/,
const Upstream::Host& /*host*/,
std::vector<uint8_t>& /*hash_key*/) override {
return absl::nullopt;
}
// Lifetime tracking not implemented.
OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override {
return {};
}
protected:
ThreadAwareLoadBalancerBase(const PrioritySet& priority_set, ClusterLbStats& stats,
Runtime::Loader& runtime, Random::RandomGenerator& random,
uint32_t healthy_panic_threshold, bool locality_weighted_balancing,
HashPolicySharedPtr hash_policy)
: LoadBalancerBase(priority_set, stats, runtime, random, healthy_panic_threshold),
factory_(new LoadBalancerFactoryImpl(stats, random, std::move(hash_policy))),
locality_weighted_balancing_(locality_weighted_balancing) {}
private:
struct PerPriorityState {
std::shared_ptr<HashingLoadBalancer> current_lb_;
bool global_panic_{};
};
using PerPriorityStatePtr = std::unique_ptr<PerPriorityState>;
struct LoadBalancerImpl : public LoadBalancer {
LoadBalancerImpl(ClusterLbStats& stats, Random::RandomGenerator& random,
HashPolicySharedPtr hash_policy)
: stats_(stats), random_(random), hash_policy_(std::move(hash_policy)) {}
// Upstream::LoadBalancer
HostSelectionResponse chooseHost(LoadBalancerContext* context) override;
// Preconnect not implemented for hash based load balancing
HostConstSharedPtr peekAnotherHost(LoadBalancerContext*) override { return nullptr; }
absl::optional<Upstream::SelectedPoolAndConnection>
selectExistingConnection(Upstream::LoadBalancerContext* /*context*/,
const Upstream::Host& /*host*/,
std::vector<uint8_t>& /*hash_key*/) override {
return absl::nullopt;
}
OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override {
return {};
}
ClusterLbStats& stats_;
Random::RandomGenerator& random_;
HashPolicySharedPtr hash_policy_;
std::shared_ptr<std::vector<PerPriorityStatePtr>> per_priority_state_;
std::shared_ptr<HealthyLoad> healthy_per_priority_load_;
std::shared_ptr<DegradedLoad> degraded_per_priority_load_;
};
struct LoadBalancerFactoryImpl : public LoadBalancerFactory {
LoadBalancerFactoryImpl(ClusterLbStats& stats, Random::RandomGenerator& random,
std::shared_ptr<Http::HashPolicy> hash_policy)
: stats_(stats), random_(random), hash_policy_(std::move(hash_policy)) {}
// Upstream::LoadBalancerFactory
// Ignore the params for the thread-aware LB.
LoadBalancerPtr create(LoadBalancerParams) override;
ClusterLbStats& stats_;
Random::RandomGenerator& random_;
std::shared_ptr<Http::HashPolicy> hash_policy_;
absl::Mutex mutex_;
std::shared_ptr<std::vector<PerPriorityStatePtr>> per_priority_state_ ABSL_GUARDED_BY(mutex_);
// This is split out of PerPriorityState so LoadBalancerBase::ChoosePriority can be reused.
std::shared_ptr<HealthyLoad> healthy_per_priority_load_ ABSL_GUARDED_BY(mutex_);
std::shared_ptr<DegradedLoad> degraded_per_priority_load_ ABSL_GUARDED_BY(mutex_);
};
virtual HashingLoadBalancerSharedPtr
createLoadBalancer(const NormalizedHostWeightVector& normalized_host_weights,
double min_normalized_weight, double max_normalized_weight) PURE;
void refresh();
std::shared_ptr<LoadBalancerFactoryImpl> factory_;
const bool locality_weighted_balancing_{};
Common::CallbackHandlePtr priority_update_cb_;
Common::CallbackHandlePtr member_update_cb_;
};
class TypedHashLbConfigBase : public LoadBalancerConfig {
public:
TypedHashLbConfigBase() = default;
TypedHashLbConfigBase(absl::Span<const HashPolicyProto* const> hash_policy,
Regex::Engine& regex_engine, absl::Status& creation_status);
absl::Status validateEndpoints(const PriorityState& priorities) const override;
HashPolicySharedPtr hash_policy_;
};
} // namespace Upstream
} // namespace Envoy