Skip to content

Commit 637a986

Browse files
committed
Add flatbuffers protocol support for brpc
Key components: - Protocol Implementation * Added flatbuffers_protocol.cpp/h for parsing, serialization and request/response handling. * Registered "fb_rpc" protocol in global.cpp - Core Infrastructure Updates * Channel: Added FBCallMethod and CallMethodInternal<bool is_pb> template for dual pb/fb support. * Controller: Added _fb_method, _fb_response fields and is_use_flatbuffer() method for flatbuffers state tracking. * Server: Added FlatBuffersMethodProperty/ServiceProperty structures and AddService() overloads for flatbuffers::Service registration. - Protocol Handler Refactoring * Updated all protocol handlers (baidu_rpc, http, http2, redis, etc.) to use generic void* pointers instead of google::protobuf types, enabling protocol-agnostic message handling.
1 parent 39a3436 commit 637a986

46 files changed

Lines changed: 907 additions & 84 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

src/brpc/channel.cpp

Lines changed: 84 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -430,11 +430,18 @@ static void HandleBackupRequest(void* arg) {
430430
bthread_id_error(correlation_id, EBACKUPREQUEST);
431431
}
432432

433-
void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
434-
google::protobuf::RpcController* controller_base,
435-
const google::protobuf::Message* request,
436-
google::protobuf::Message* response,
437-
google::protobuf::Closure* done) {
433+
template <bool is_pb>
434+
void Channel::CallMethodInternal(const typename std::conditional<is_pb,
435+
google::protobuf::MethodDescriptor,
436+
brpc::flatbuffers::MethodDescriptor>::type* method,
437+
google::protobuf::RpcController* controller_base,
438+
const typename std::conditional<is_pb,
439+
google::protobuf::Message,
440+
brpc::flatbuffers::Message>::type* request,
441+
typename std::conditional<is_pb,
442+
google::protobuf::Message,
443+
brpc::flatbuffers::Message>::type* response,
444+
google::protobuf::Closure* done) {
438445
const int64_t start_send_real_us = butil::gettimeofday_us();
439446
Controller* cntl = static_cast<Controller*>(controller_base);
440447
cntl->OnRPCBegin(start_send_real_us);
@@ -492,22 +499,36 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
492499

493500
if (cntl->_sender == NULL && IsTraceable(Span::tls_parent())) {
494501
const int64_t start_send_us = butil::cpuwide_time_us();
495-
std::string method_name;
502+
const std::string* method_name = NULL;
496503
if (_get_method_name) {
497-
method_name = butil::EnsureString(_get_method_name(method, cntl));
504+
if (is_pb) {
505+
auto pb_method = reinterpret_cast<const google::protobuf::MethodDescriptor*>(method);
506+
method_name = &_get_method_name(pb_method, cntl);
507+
} else {
508+
// FlatBuffers doesn't support _get_method_name yet
509+
method_name = NULL;
510+
}
498511
} else if (method) {
499-
method_name = butil::EnsureString(method->full_name());
512+
if (is_pb) {
513+
auto pb_method = reinterpret_cast<const google::protobuf::MethodDescriptor*>(method);
514+
method_name = &pb_method->full_name();
515+
} else {
516+
auto fb_method = reinterpret_cast<const brpc::flatbuffers::MethodDescriptor*>(method);
517+
method_name = &fb_method->full_name();
518+
}
500519
} else {
501520
const static std::string NULL_METHOD_STR = "null-method";
502-
method_name = NULL_METHOD_STR;
521+
method_name = &NULL_METHOD_STR;
522+
}
523+
if (method_name) {
524+
Span* span = Span::CreateClientSpan(
525+
*method_name, start_send_real_us - start_send_us);
526+
span->set_log_id(cntl->log_id());
527+
span->set_base_cid(correlation_id);
528+
span->set_protocol(_options.protocol);
529+
span->set_start_send_us(start_send_us);
530+
cntl->_span = span;
503531
}
504-
Span* span = Span::CreateClientSpan(
505-
method_name, start_send_real_us - start_send_us);
506-
span->set_log_id(cntl->log_id());
507-
span->set_base_cid(correlation_id);
508-
span->set_protocol(_options.protocol);
509-
span->set_start_send_us(start_send_us);
510-
cntl->_span = span;
511532
}
512533
// Override some options if they haven't been set by Controller
513534
if (cntl->timeout_ms() == UNSET_MAGIC_NUM) {
@@ -525,11 +546,20 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
525546
if (cntl->connection_type() == CONNECTION_TYPE_UNKNOWN) {
526547
cntl->set_connection_type(_options.connection_type);
527548
}
528-
cntl->_response = response;
549+
529550
cntl->_done = done;
530551
cntl->_pack_request = _pack_request;
531-
cntl->_method = method;
532552
cntl->_auth = _options.auth;
553+
// Use reinterpret_cast to avoid template instantiation errors
554+
// The actual type is guaranteed by the is_pb parameter
555+
if (is_pb) {
556+
cntl->_method = reinterpret_cast<const google::protobuf::MethodDescriptor*>(method);
557+
cntl->_response = reinterpret_cast<google::protobuf::Message*>(response);
558+
} else {
559+
cntl->_fb_method = reinterpret_cast<const brpc::flatbuffers::MethodDescriptor*>(method);
560+
cntl->_fb_response = reinterpret_cast<brpc::flatbuffers::Message*>(response);
561+
cntl->set_use_flatbuffer();
562+
}
533563

534564
if (SingleServer()) {
535565
cntl->_single_server_id = _server_id;
@@ -615,6 +645,22 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
615645
}
616646
}
617647

648+
void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
649+
google::protobuf::RpcController* controller_base,
650+
const google::protobuf::Message* request,
651+
google::protobuf::Message* response,
652+
google::protobuf::Closure* done) {
653+
CallMethodInternal<true>(method, controller_base, request, response, done);
654+
}
655+
656+
void Channel::FBCallMethod(const brpc::flatbuffers::MethodDescriptor* method,
657+
google::protobuf::RpcController* controller_base,
658+
const brpc::flatbuffers::Message* request,
659+
brpc::flatbuffers::Message* response,
660+
google::protobuf::Closure* done) {
661+
CallMethodInternal<false>(method, controller_base, request, response, done);
662+
}
663+
618664
void Channel::Describe(std::ostream& os, const DescribeOptions& opt) const {
619665
os << "Channel[";
620666
if (SingleServer()) {
@@ -644,4 +690,24 @@ int Channel::CheckHealth() {
644690
}
645691
}
646692

693+
// CallMethodInternal instance for pb and fb
694+
template
695+
void Channel::CallMethodInternal<true>(
696+
const google::protobuf::MethodDescriptor* method,
697+
google::protobuf::RpcController* controller_base,
698+
const google::protobuf::Message* request,
699+
google::protobuf::Message* response,
700+
google::protobuf::Closure* done
701+
);
702+
703+
// CallMethodInternal instance for pb and fb
704+
template
705+
void Channel::CallMethodInternal<false>(
706+
const brpc::flatbuffers::MethodDescriptor* method,
707+
google::protobuf::RpcController* controller_base,
708+
const brpc::flatbuffers::Message* request,
709+
brpc::flatbuffers::Message* response,
710+
google::protobuf::Closure* done
711+
);
712+
647713
} // namespace brpc

src/brpc/channel.h

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include "brpc/backup_request_policy.h"
3838
#include "brpc/naming_service_filter.h"
3939
#include "brpc/health_check_option.h"
40+
#include "brpc/details/flatbuffers_impl.h" // flatbuffers
4041

4142
namespace brpc {
4243

@@ -163,7 +164,8 @@ struct ChannelOptions {
163164
// channel.Init("bns://rdev.matrix.all", "rr", NULL/*default options*/);
164165
// MyService_Stub stub(&channel);
165166
// stub.MyMethod(&controller, &request, &response, NULL);
166-
class Channel : public ChannelBase {
167+
class Channel : public ChannelBase,
168+
public brpc::flatbuffers::RpcChannel {
167169
friend class Controller;
168170
friend class SelectiveChannel;
169171
public:
@@ -212,6 +214,11 @@ friend class SelectiveChannel;
212214
google::protobuf::Message* response,
213215
google::protobuf::Closure* done);
214216

217+
void FBCallMethod(const brpc::flatbuffers::MethodDescriptor* method,
218+
google::protobuf::RpcController* controller_base,
219+
const brpc::flatbuffers::Message* request,
220+
brpc::flatbuffers::Message* response,
221+
google::protobuf::Closure* done);
215222
// Get current options.
216223
const ChannelOptions& options() const { return _options; }
217224

@@ -239,6 +246,19 @@ friend class SelectiveChannel;
239246
const ChannelOptions* options,
240247
int raw_port = -1);
241248

249+
template <bool is_pb>
250+
inline void CallMethodInternal(const typename std::conditional<is_pb,
251+
google::protobuf::MethodDescriptor,
252+
brpc::flatbuffers::MethodDescriptor>::type* method,
253+
google::protobuf::RpcController* controller_base,
254+
const typename std::conditional<is_pb,
255+
google::protobuf::Message,
256+
brpc::flatbuffers::Message>::type* request,
257+
typename std::conditional<is_pb,
258+
google::protobuf::Message,
259+
brpc::flatbuffers::Message>::type* response,
260+
google::protobuf::Closure* done);
261+
242262
std::string _service_name;
243263
std::string _scheme;
244264
butil::EndPoint _server_address;

src/brpc/channel_base.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "butil/logging.h"
2525
#include <google/protobuf/service.h> // google::protobuf::RpcChannel
2626
#include "brpc/describable.h"
27+
#include "brpc/details/flatbuffers_common.h"
2728

2829
// To brpc developers: This is a header included by user, don't depend
2930
// on internal structures, use opaque pointers instead.

src/brpc/controller.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,7 @@ void Controller::ResetPods() {
285285
_accessed = NULL;
286286
_pack_request = NULL;
287287
_method = NULL;
288+
_fb_method = NULL;
288289
_auth = NULL;
289290
_idl_names = idl_single_req_single_res;
290291
_idl_result = IDL_VOID_RESULT;
@@ -1200,7 +1201,9 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
12001201
// Make request
12011202
butil::IOBuf packet;
12021203
SocketMessage* user_packet = NULL;
1203-
_pack_request(&packet, &user_packet, cid.value, _method, this,
1204+
const void *method_desc = is_use_flatbuffer()? (const void*)_fb_method :
1205+
(const void*)_method;
1206+
_pack_request(&packet, &user_packet, cid.value, method_desc, this,
12041207
_request_buf, using_auth);
12051208
// TODO: PackRequest may accept SocketMessagePtr<>?
12061209
SocketMessagePtr<> user_packet_guard(user_packet);

src/brpc/controller.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
#include "brpc/grpc.h"
4848
#include "brpc/kvmap.h"
4949
#include "brpc/rpc_dump.h"
50+
#include "brpc/details/flatbuffers_common.h"
5051

5152
// EAUTH is defined in MAC
5253
#ifndef EAUTH
@@ -151,6 +152,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
151152
static const uint32_t FLAGS_PB_SINGLE_REPEATED_TO_ARRAY = (1 << 20);
152153
static const uint32_t FLAGS_MANAGE_HTTP_BODY_ON_ERROR = (1 << 21);
153154
static const uint32_t FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND = (1 << 22);
155+
static const uint32_t FLAGS_USE_FLATBUFFER = (1 << 23);
154156

155157
public:
156158
struct Inheritable {
@@ -220,6 +222,8 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
220222
// Response of the RPC call (passed to CallMethod)
221223
google::protobuf::Message* response() const { return _response; }
222224

225+
brpc::flatbuffers::Message* fb_response() const { return _fb_response; }
226+
223227
// An identifier to send to server along with request. This is widely used
224228
// throughout baidu's servers to tag a searching session (a series of
225229
// queries following the topology of servers) with a same log_id.
@@ -292,6 +296,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
292296
// Get the called method. May-be NULL for non-pb services.
293297
const google::protobuf::MethodDescriptor* method() const { return _method; }
294298

299+
const brpc::flatbuffers::MethodDescriptor* fb_method() const { return _fb_method; }
295300
// Get the controllers for accessing sub channels in combo channels.
296301
// Ordinary channel:
297302
// sub_count() is 0 and sub() is always NULL.
@@ -649,6 +654,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
649654
// the received time of RPC is not recorded in the controller.
650655
int64_t get_rpc_received_us() const { return _rpc_received_us; }
651656

657+
void set_use_flatbuffer() { add_flag(FLAGS_USE_FLATBUFFER); }
658+
bool is_use_flatbuffer() const { return has_flag(FLAGS_USE_FLATBUFFER); }
659+
652660
private:
653661
struct CompletionInfo {
654662
CallId id; // call_id of the corresponding request
@@ -860,6 +868,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
860868
Inheritable _inheritable;
861869
int _pchan_sub_count;
862870
google::protobuf::Message* _response;
871+
brpc::flatbuffers::Message* _fb_response;
863872
google::protobuf::Closure* _done;
864873
RPCSender* _sender;
865874
uint64_t _request_code;
@@ -878,6 +887,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
878887
// Fields will be used when making requests
879888
Protocol::PackRequest _pack_request;
880889
const google::protobuf::MethodDescriptor* _method;
890+
const brpc::flatbuffers::MethodDescriptor* _fb_method;
881891
const Authenticator* _auth;
882892
butil::IOBuf _request_buf;
883893
IdlNames _idl_names;

src/brpc/details/controller_private_accessor.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,9 @@ class ControllerPrivateAccessor {
125125
void set_method(const google::protobuf::MethodDescriptor* method)
126126
{ _cntl->_method = method; }
127127

128+
void set_fb_method(const brpc::flatbuffers::MethodDescriptor* method)
129+
{ _cntl->_fb_method = method; }
130+
128131
void set_readable_progressive_attachment(ReadableProgressiveAttachment* s)
129132
{ _cntl->_rpa.reset(s); }
130133

src/brpc/details/server_private_accessor.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ class ServerPrivateAccessor {
8686
return _server->FindServicePropertyByName(name);
8787
}
8888

89+
const Server::FlatBuffersMethodProperty* FindFlatBufferMethodPropertyByIndex(
90+
uint32_t server_index, int method_index) const {
91+
return _server->FindFlatBufferMethodPropertyByIndex(server_index, method_index);
92+
}
93+
8994
const Server::ServiceProperty*
9095
FindServicePropertyAdaptively(const butil::StringPiece& service_name) const {
9196
if (service_name.find('.') == butil::StringPiece::npos) {

src/brpc/global.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
// Protocols
6868
#include "brpc/protocol.h"
6969
#include "brpc/policy/baidu_rpc_protocol.h"
70+
#include "brpc/policy/flatbuffers_protocol.h"
7071
#include "brpc/policy/http_rpc_protocol.h"
7172
#include "brpc/policy/http2_rpc_protocol.h"
7273
#include "brpc/policy/hulu_pbrpc_protocol.h"
@@ -423,6 +424,15 @@ static void GlobalInitializeOrDieImpl() {
423424
exit(1);
424425
}
425426

427+
Protocol fb_protocol = { ParseFlatBuffersMessage,
428+
SerializeFlatBuffersRequest, PackFlatBuffersRequest,
429+
ProcessFlatBuffersRequest, ProcessFlatBuffersResponse,
430+
NULL, NULL, NULL,
431+
CONNECTION_TYPE_SINGLE, "fb_rpc" };
432+
if (RegisterProtocol(PROTOCOL_FLATBUFFERS_RPC, fb_protocol) != 0) {
433+
exit(1);
434+
}
435+
426436
Protocol streaming_protocol = { ParseStreamingMessage,
427437
NULL, NULL, ProcessStreamingMessage,
428438
ProcessStreamingMessage,

src/brpc/options.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ enum ProtocolType {
6565
PROTOCOL_ESP = 25; // Client side only
6666
PROTOCOL_H2 = 26;
6767
PROTOCOL_COUCHBASE = 27;
68+
PROTOCOL_FLATBUFFERS_RPC = 28;
6869
}
6970

7071
enum CompressType {

src/brpc/policy/baidu_rpc_protocol.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,7 +1013,8 @@ void ProcessRpcResponse(InputMessageBase* msg_base) {
10131013
}
10141014

10151015
void SerializeRpcRequest(butil::IOBuf* request_buf, Controller* cntl,
1016-
const google::protobuf::Message* request) {
1016+
const void* request_obj) {
1017+
const google::protobuf::Message* request = static_cast<const google::protobuf::Message*>(request_obj);
10171018
// Check sanity of request.
10181019
if (NULL == request) {
10191020
return cntl->SetFailed(EREQUEST, "`request' is NULL");
@@ -1045,10 +1046,12 @@ void SerializeRpcRequest(butil::IOBuf* request_buf, Controller* cntl,
10451046
void PackRpcRequest(butil::IOBuf* req_buf,
10461047
SocketMessage**,
10471048
uint64_t correlation_id,
1048-
const google::protobuf::MethodDescriptor* method,
1049+
const void* method_descriptor,
10491050
Controller* cntl,
10501051
const butil::IOBuf& request_body,
10511052
const Authenticator* auth) {
1053+
const google::protobuf::MethodDescriptor* method =
1054+
static_cast<const google::protobuf::MethodDescriptor*>(method_descriptor);
10521055
RpcMeta meta;
10531056
if (auth && auth->GenerateCredential(
10541057
meta.mutable_authentication_data()) != 0) {

0 commit comments

Comments
 (0)