Skip to content

Commit 5b0a6c1

Browse files
zhengJadejiazheng.jia
andauthored
epoll bthread deal first (#2819)
* add global priority queue * add parking lot license * fix scale workers * fix expected state * add global priority queue * fix setconcurrency ut error * fix rebase error * add global priority bthread flags * deal some comments * fix comments --------- Co-authored-by: jiazheng.jia <jiazheng.jia@antgroup.com>
1 parent 74ca596 commit 5b0a6c1

9 files changed

Lines changed: 43 additions & 8 deletions

src/brpc/event_dispatcher_epoll.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ EventDispatcher::EventDispatcher()
2626
: _event_dispatcher_fd(-1)
2727
, _stop(false)
2828
, _tid(0)
29-
, _thread_attr(BTHREAD_ATTR_NORMAL) {
29+
, _thread_attr(BTHREAD_ATTR_EPOLL) {
3030
_event_dispatcher_fd = epoll_create(1024 * 1024);
3131
if (_event_dispatcher_fd < 0) {
3232
PLOG(FATAL) << "Fail to create epoll";
@@ -69,8 +69,9 @@ int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {
6969

7070
// Set _thread_attr before creating epoll thread to make sure
7171
// everyting seems sane to the thread.
72-
_thread_attr = consumer_thread_attr ?
73-
*consumer_thread_attr : BTHREAD_ATTR_NORMAL;
72+
if (consumer_thread_attr) {
73+
_thread_attr = *consumer_thread_attr | BTHREAD_GLOBAL_PRIORITY;
74+
}
7475

7576
//_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it will cause new bthread
7677
// that created by epoll_wait() never to quit.

src/bthread/parking_lot.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ class BAIDU_CACHELINE_ALIGNMENT ParkingLot {
7070
_pending_signal.fetch_or(1);
7171
futex_wake_private(&_pending_signal, 10000);
7272
}
73+
7374
private:
7475
// higher 31 bits for signalling, LSB for stopping.
7576
butil::atomic<int> _pending_signal;

src/bthread/task_control.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ TaskControl::TaskControl()
183183
, _signal_per_second(&_cumulated_signal_count)
184184
, _status(print_rq_sizes_in_the_tc, this)
185185
, _nbthreads("bthread_count")
186+
, _priority_queues(FLAGS_task_group_ntags)
186187
, _pl(FLAGS_task_group_ntags)
187188
{}
188189

@@ -207,6 +208,10 @@ int TaskControl::init(int concurrency) {
207208
_tagged_worker_usage_second.push_back(new bvar::PerSecond<bvar::PassiveStatus<double>>(
208209
"bthread_worker_usage", tag_str, _tagged_cumulated_worker_time[i], 1));
209210
_tagged_nbthreads.push_back(new bvar::Adder<int64_t>("bthread_count", tag_str));
211+
if (_priority_queues[i].init(BTHREAD_MAX_CONCURRENCY) != 0) {
212+
LOG(FATAL) << "Fail to init _priority_q";
213+
return -1;
214+
}
210215
}
211216

212217
// Make sure TimerThread is ready.
@@ -429,6 +434,11 @@ int TaskControl::_destroy_group(TaskGroup* g) {
429434

430435
bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
431436
auto tag = tls_task_group->tag();
437+
438+
if (_priority_queues[tag].steal(tid)) {
439+
return true;
440+
}
441+
432442
// 1: Acquiring fence is paired with releasing fence in _add_group to
433443
// avoid accessing uninitialized slot of _groups.
434444
const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_acquire/*1*/);

src/bthread/task_control.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ friend bthread_t init_for_pthread_stack_trace();
9797
std::string stack_trace(bthread_t tid);
9898
#endif // BRPC_BTHREAD_TRACER
9999

100+
void push_priority_queue(bthread_tag_t tag, bthread_t tid) {
101+
_priority_queues[tag].push(tid);
102+
}
103+
100104
private:
101105
typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups;
102106
static const int PARKING_LOT_NUM = 4;
@@ -153,6 +157,7 @@ friend bthread_t init_for_pthread_stack_trace();
153157
std::vector<bvar::PassiveStatus<double>*> _tagged_cumulated_worker_time;
154158
std::vector<bvar::PerSecond<bvar::PassiveStatus<double>>*> _tagged_worker_usage_second;
155159
std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads;
160+
std::vector<WorkStealingQueue<bthread_t>> _priority_queues;
156161

157162
std::vector<TaggedParkingLot> _pl;
158163

src/bthread/task_group.cpp

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -482,12 +482,19 @@ int TaskGroup::start_foreground(TaskGroup** pg,
482482
} else {
483483
// NOSIGNAL affects current task, not the new task.
484484
RemainedFn fn = NULL;
485-
if (g->current_task()->about_to_quit) {
485+
auto& cur_attr = g->_cur_meta->attr;
486+
if (cur_attr.flags & BTHREAD_GLOBAL_PRIORITY) {
487+
fn = priority_to_run;
488+
} else if (g->current_task()->about_to_quit) {
486489
fn = ready_to_run_in_worker_ignoresignal;
487490
} else {
488491
fn = ready_to_run_in_worker;
489492
}
490-
ReadyToRunArgs args = { g->_cur_meta, (bool)(using_attr.flags & BTHREAD_NOSIGNAL) };
493+
ReadyToRunArgs args = {
494+
g->tag(),
495+
g->_cur_meta,
496+
(bool)(using_attr.flags & BTHREAD_NOSIGNAL)
497+
};
491498
g->set_remained(fn, &args);
492499
TaskGroup::sched_to(pg, m->tid);
493500
}
@@ -861,6 +868,11 @@ void TaskGroup::ready_to_run_in_worker_ignoresignal(void* args_in) {
861868
return tls_task_group->push_rq(args->meta->tid);
862869
}
863870

871+
void TaskGroup::priority_to_run(void* args_in) {
872+
ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in);
873+
return tls_task_group->control()->push_priority_queue(args->tag, args->meta->tid);
874+
}
875+
864876
struct SleepArgs {
865877
uint64_t timeout_us;
866878
bthread_t tid;
@@ -1035,7 +1047,7 @@ int TaskGroup::interrupt(bthread_t tid, TaskControl* c, bthread_tag_t tag) {
10351047

10361048
void TaskGroup::yield(TaskGroup** pg) {
10371049
TaskGroup* g = *pg;
1038-
ReadyToRunArgs args = { g->_cur_meta, false };
1050+
ReadyToRunArgs args = { g->tag(), g->_cur_meta, false };
10391051
g->set_remained(ready_to_run_in_worker, &args);
10401052
sched(pg);
10411053
}

src/bthread/task_group.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,11 +221,13 @@ friend class TaskControl;
221221
static void _release_last_context(void*);
222222
static void _add_sleep_event(void*);
223223
struct ReadyToRunArgs {
224+
bthread_tag_t tag;
224225
TaskMeta* meta;
225226
bool nosignal;
226227
};
227228
static void ready_to_run_in_worker(void*);
228229
static void ready_to_run_in_worker_ignoresignal(void*);
230+
static void priority_to_run(void*);
229231

230232
// Wait for a task to run.
231233
// Returns true on success, false is treated as permanent error and the

src/bthread/task_group_inl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ inline void TaskGroup::exchange(TaskGroup** pg, TaskMeta* next_meta) {
5151
if (g->is_current_pthread_task()) {
5252
return g->ready_to_run(next_meta);
5353
}
54-
ReadyToRunArgs args = { g->_cur_meta, false };
54+
ReadyToRunArgs args = { g->tag(), g->_cur_meta, false };
5555
g->set_remained((g->current_task()->about_to_quit
5656
? ready_to_run_in_worker_ignoresignal
5757
: ready_to_run_in_worker),

src/bthread/types.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ static const bthread_attrflags_t BTHREAD_LOG_CONTEXT_SWITCH = 16;
5353
static const bthread_attrflags_t BTHREAD_NOSIGNAL = 32;
5454
static const bthread_attrflags_t BTHREAD_NEVER_QUIT = 64;
5555
static const bthread_attrflags_t BTHREAD_INHERIT_SPAN = 128;
56+
static const bthread_attrflags_t BTHREAD_GLOBAL_PRIORITY = 256;
5657

5758
// Key of thread-local data, created by bthread_key_create.
5859
typedef struct {
@@ -137,6 +138,10 @@ static const bthread_attr_t BTHREAD_ATTR_NORMAL = {BTHREAD_STACKTYPE_NORMAL, 0,
137138
static const bthread_attr_t BTHREAD_ATTR_LARGE = {BTHREAD_STACKTYPE_LARGE, 0, NULL,
138139
BTHREAD_TAG_INVALID};
139140

141+
// epoll bthread
142+
static const bthread_attr_t BTHREAD_ATTR_EPOLL = {
143+
BTHREAD_STACKTYPE_NORMAL, BTHREAD_GLOBAL_PRIORITY, NULL, BTHREAD_TAG_INVALID};
144+
140145
// bthreads created with this attribute will print log when it's started,
141146
// context-switched, finished.
142147
static const bthread_attr_t BTHREAD_ATTR_DEBUG = {

test/bthread_setconcurrency_unittest.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,6 @@ int concurrency_by_tag(int num) {
214214

215215
TEST(BthreadTest, concurrency_by_tag) {
216216
ASSERT_EQ(concurrency_by_tag(1), false);
217-
auto tag_con = bthread_getconcurrency_by_tag(0);
218217
auto con = bthread_getconcurrency();
219218
ASSERT_EQ(concurrency_by_tag(con), true);
220219
ASSERT_EQ(concurrency_by_tag(con + 1), true);

0 commit comments

Comments
 (0)