2323#include " brpc/rdma/rdma_helper.h"
2424
2525namespace brpc {
26- DECLARE_bool (usercode_in_coroutine);
27- DECLARE_bool (usercode_in_pthread);
28-
29- extern SocketVarsCollector *g_vars;
30-
31- void RdmaTransport::Init (Socket *socket, const SocketOptions &options) {
32- CHECK (_rdma_ep == NULL );
33- if (options.socket_mode == SOCKET_MODE_RDMA) {
34- _rdma_ep = new (std::nothrow)rdma::RdmaEndpoint (socket);
35- if (!_rdma_ep) {
36- const int saved_errno = errno;
37- PLOG (ERROR) << " Fail to create RdmaEndpoint" ;
38- socket->SetFailed (saved_errno, " Fail to create RdmaEndpoint: %s" ,
39- berror (saved_errno));
40- }
41- _rdma_state = RDMA_UNKNOWN;
42- } else {
43- _rdma_state = RDMA_OFF;
44- socket->_socket_mode = SOCKET_MODE_TCP;
45- }
46- _socket = socket;
47- _default_connect = options.app_connect ;
48- _on_edge_trigger = options.on_edge_triggered_events ;
49- if (options.need_on_edge_trigger && _on_edge_trigger == NULL ) {
50- _on_edge_trigger = rdma::RdmaEndpoint::OnNewDataFromTcp;
51- }
52- _tcp_transport = std::make_shared<TcpTransport>();
53- _tcp_transport->Init (socket, options);
26+ DECLARE_bool (usercode_in_coroutine);
27+ DECLARE_bool (usercode_in_pthread);
28+
29+ extern SocketVarsCollector *g_vars;
30+
31+ void RdmaTransport::Init (Socket *socket, const SocketOptions &options) {
32+ CHECK (_rdma_ep == NULL );
33+ if (options.socket_mode == SOCKET_MODE_RDMA) {
34+ _rdma_ep = new (std::nothrow)rdma::RdmaEndpoint (socket);
35+ if (!_rdma_ep) {
36+ const int saved_errno = errno;
37+ PLOG (ERROR) << " Fail to create RdmaEndpoint" ;
38+ socket->SetFailed (saved_errno, " Fail to create RdmaEndpoint: %s" ,
39+ berror (saved_errno));
40+ }
41+ _rdma_state = RDMA_UNKNOWN;
42+ } else {
43+ _rdma_state = RDMA_OFF;
44+ socket->_socket_mode = SOCKET_MODE_TCP;
5445 }
46+ _socket = socket;
47+ _default_connect = options.app_connect ;
48+ _on_edge_trigger = options.on_edge_triggered_events ;
49+ if (options.need_on_edge_trigger && _on_edge_trigger == NULL ) {
50+ _on_edge_trigger = rdma::RdmaEndpoint::OnNewDataFromTcp;
51+ }
52+ _tcp_transport = std::make_shared<TcpTransport>();
53+ _tcp_transport->Init (socket, options);
54+ }
5555
56- void RdmaTransport::Release () {
57- if (_rdma_ep) {
58- delete _rdma_ep;
59- _rdma_ep = NULL ;
60- _rdma_state = RDMA_UNKNOWN;
61- }
56+ void RdmaTransport::Release () {
57+ if (_rdma_ep) {
58+ delete _rdma_ep;
59+ _rdma_ep = NULL ;
60+ _rdma_state = RDMA_UNKNOWN;
6261 }
62+ }
6363
64- int RdmaTransport::Reset (int32_t expected_nref) {
65- if (_rdma_ep) {
66- _rdma_ep->Reset ();
67- _rdma_state = RDMA_UNKNOWN;
68- }
69- return 0 ;
64+ int RdmaTransport::Reset (int32_t expected_nref) {
65+ if (_rdma_ep) {
66+ _rdma_ep->Reset ();
67+ _rdma_state = RDMA_UNKNOWN;
7068 }
69+ return 0 ;
70+ }
7171
72- std::shared_ptr<AppConnect> RdmaTransport::Connect () {
73- if (_default_connect == nullptr ) {
74- return std::make_shared<rdma::RdmaConnect>();
75- }
76- return _default_connect;
72+ std::shared_ptr<AppConnect> RdmaTransport::Connect () {
73+ if (_default_connect == nullptr ) {
74+ return std::make_shared<rdma::RdmaConnect>();
7775 }
76+ return _default_connect;
77+ }
7878
79- int RdmaTransport::CutFromIOBuf (butil::IOBuf *buf) {
80- if (_rdma_ep && _rdma_state != RDMA_OFF) {
81- butil::IOBuf *data_arr[1 ] = {buf};
82- return _rdma_ep->CutFromIOBufList (data_arr, 1 );
83- } else {
84- return _tcp_transport->CutFromIOBuf (buf);
85- }
79+ int RdmaTransport::CutFromIOBuf (butil::IOBuf *buf) {
80+ if (_rdma_ep && _rdma_state != RDMA_OFF) {
81+ butil::IOBuf *data_arr[1 ] = {buf};
82+ return _rdma_ep->CutFromIOBufList (data_arr, 1 );
83+ } else {
84+ return _tcp_transport->CutFromIOBuf (buf);
8685 }
86+ }
8787
88- ssize_t RdmaTransport::CutFromIOBufList (butil::IOBuf **buf, size_t ndata) {
89- if (_rdma_ep && _rdma_state != RDMA_OFF) {
90- return _rdma_ep->CutFromIOBufList (buf, ndata);
91- }
92- return butil::IOBuf::cut_multiple_into_file_descriptor (_socket->fd (), buf, ndata);
93- }
94-
95- int RdmaTransport::WaitEpollOut (butil::atomic<int > *_epollout_butex,
96- bool pollin, const timespec duetime) {
97- if (_rdma_state == RDMA_ON) {
98- const int expected_val = _epollout_butex
99- ->load (butil::memory_order_acquire);
100- CHECK (_rdma_ep != NULL );
101- if (!_rdma_ep->IsWritable ()) {
102- g_vars->nwaitepollout << 1 ;
103- if (bthread::butex_wait (_epollout_butex,
104- expected_val, &duetime) < 0 ) {
105- if (errno != EAGAIN && errno != ETIMEDOUT) {
106- const int saved_errno = errno;
107- PLOG (WARNING) << " Fail to wait rdma window of " << _socket;
108- _socket->SetFailed (saved_errno, " Fail to wait rdma window of %s: %s" ,
109- _socket->description ().c_str (), berror (saved_errno));
110- }
111- if (_socket->Failed ()) {
112- // NOTE:
113- // Different from TCP, we cannot find the RDMA channel
114- // failed by writing to it. Thus we must check if it
115- // is already failed here.
116- return 1 ;
117- }
118- }
119- }
120- } else {
121- return _tcp_transport->WaitEpollOut (_epollout_butex, pollin, duetime);
122- }
123- return 0 ;
88+ ssize_t RdmaTransport::CutFromIOBufList (butil::IOBuf **buf, size_t ndata) {
89+ if (_rdma_ep && _rdma_state != RDMA_OFF) {
90+ return _rdma_ep->CutFromIOBufList (buf, ndata);
12491 }
92+ return butil::IOBuf::cut_multiple_into_file_descriptor (_socket->fd (), buf, ndata);
93+ }
12594
126- void RdmaTransport::ProcessEvent (bthread_attr_t attr) {
127- bthread_t tid;
128- if (FLAGS_usercode_in_coroutine) {
129- OnEdge (_socket);
130- } else if (rdma::FLAGS_rdma_edisp_unsched == false ) {
131- auto rc = bthread_start_background (&tid, &attr, OnEdge, _socket);
132- if (rc != 0 ) {
133- LOG (FATAL) << " Fail to start ProcessEvent" ;
134- OnEdge (_socket);
135- }
136- } else if (bthread_start_urgent (&tid, &attr, OnEdge, _socket) != 0 ) {
95+ int RdmaTransport::WaitEpollOut (butil::atomic<int > *_epollout_butex,
96+ bool pollin, const timespec duetime) {
97+ if (_rdma_state == RDMA_ON) {
98+ const int expected_val = _epollout_butex
99+ ->load (butil::memory_order_acquire);
100+ CHECK (_rdma_ep != NULL );
101+ if (!_rdma_ep->IsWritable ()) {
102+ g_vars->nwaitepollout << 1 ;
103+ if (bthread::butex_wait (_epollout_butex, expected_val, &duetime) < 0 ) {
104+ if (errno != EAGAIN && errno != ETIMEDOUT) {
105+ const int saved_errno = errno;
106+ PLOG (WARNING) << " Fail to wait rdma window of " << _socket;
107+ _socket->SetFailed (saved_errno, " Fail to wait rdma window of %s: %s" ,
108+ _socket->description ().c_str (), berror (saved_errno));
109+ }
110+ if (_socket->Failed ()) {
111+ // NOTE:
112+ // Different from TCP, we cannot find the RDMA channel
113+ // failed by writing to it. Thus we must check if it
114+ // is already failed here.
115+ return 1 ;
116+ }
117+ }
118+ }
119+ } else {
120+ return _tcp_transport->WaitEpollOut (_epollout_butex, pollin, duetime);
121+ }
122+ return 0 ;
123+ }
124+
125+ void RdmaTransport::ProcessEvent (bthread_attr_t attr) {
126+ bthread_t tid;
127+ if (FLAGS_usercode_in_coroutine) {
128+ OnEdge (_socket);
129+ } else if (rdma::FLAGS_rdma_edisp_unsched == false ) {
130+ auto rc = bthread_start_background (&tid, &attr, OnEdge, _socket);
131+ if (rc != 0 ) {
137132 LOG (FATAL) << " Fail to start ProcessEvent" ;
138133 OnEdge (_socket);
139134 }
135+ } else if (bthread_start_urgent (&tid, &attr, OnEdge, _socket) != 0 ) {
136+ LOG (FATAL) << " Fail to start ProcessEvent" ;
137+ OnEdge (_socket);
140138 }
139+ }
141140
142- void RdmaTransport::QueueMessage (InputMessageClosure& input_msg, int * num_bthread_created, bool last_msg) {
143- if (last_msg && !rdma::FLAGS_rdma_use_polling) {
144- return ;
145- }
146- InputMessageBase* to_run_msg = input_msg.release ();
147- if (!to_run_msg) {
148- return ;
149- }
150-
151- if (rdma::FLAGS_rdma_disable_bthread) {
152- ProcessInputMessage (to_run_msg);
153- return ;
154- }
155- // Create bthread for last_msg. The bthread is not scheduled
156- // until bthread_flush() is called (in the worse case).
157-
158- // TODO(gejun): Join threads.
159- bthread_t th;
160- bthread_attr_t tmp = (FLAGS_usercode_in_pthread ?
161- BTHREAD_ATTR_PTHREAD :
162- BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
163- tmp.keytable_pool = _socket->keytable_pool ();
164- tmp.tag = bthread_self_tag ();
165- bthread_attr_set_name (&tmp, " ProcessInputMessage" );
166-
167- if (!FLAGS_usercode_in_coroutine && bthread_start_background (
168- &th, &tmp, ProcessInputMessage, to_run_msg) == 0 ) {
169- ++*num_bthread_created;
170- } else {
171- ProcessInputMessage (to_run_msg);
172- }
141+ void RdmaTransport::QueueMessage (InputMessageClosure& input_msg, int * num_bthread_created, bool last_msg) {
142+ if (last_msg && !rdma::FLAGS_rdma_use_polling) {
143+ return ;
173144 }
174-
175- void RdmaTransport::Debug (std::ostream &os) {
176- if (_rdma_state == RDMA_ON && _rdma_ep) {
177- _rdma_ep->DebugInfo (os);
178- }
145+ InputMessageBase* to_run_msg = input_msg.release ();
146+ if (!to_run_msg) {
147+ return ;
179148 }
180149
181- int RdmaTransport::ContextInitOrDie (bool serverOrNot, const void * _options) {
182- if (serverOrNot) {
183- if (!OptionsAvailableOverRdma (static_cast <const ServerOptions *>(_options))) {
184- return -1 ;
185- }
186- rdma::GlobalRdmaInitializeOrDie ();
187- if (!rdma::InitPollingModeWithTag (static_cast <const ServerOptions *>(_options)->bthread_tag )) {
188- return -1 ;
189- }
190- } else {
191- if (!OptionsAvailableForRdma (static_cast <const ChannelOptions *>(_options))) {
192- return -1 ;
193- }
194- rdma::GlobalRdmaInitializeOrDie ();
195- if (!rdma::InitPollingModeWithTag (bthread_self_tag ())) {
196- return -1 ;
197- }
198- return 0 ;
199- }
200-
201- return 0 ;
150+ if (rdma::FLAGS_rdma_disable_bthread) {
151+ ProcessInputMessage (to_run_msg);
152+ return ;
202153 }
154+ // Create bthread for last_msg. The bthread is not scheduled
155+ // until bthread_flush() is called (in the worse case).
156+
157+ // TODO(gejun): Join threads.
158+ bthread_t th;
159+ bthread_attr_t tmp = (FLAGS_usercode_in_pthread ?
160+ BTHREAD_ATTR_PTHREAD :
161+ BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
162+ tmp.keytable_pool = _socket->keytable_pool ();
163+ tmp.tag = bthread_self_tag ();
164+ bthread_attr_set_name (&tmp, " ProcessInputMessage" );
165+
166+ if (!FLAGS_usercode_in_coroutine && bthread_start_background (
167+ &th, &tmp, ProcessInputMessage, to_run_msg) == 0 ) {
168+ ++*num_bthread_created;
169+ } else {
170+ ProcessInputMessage (to_run_msg);
171+ }
172+ }
203173
204- bool RdmaTransport::OptionsAvailableForRdma (const ChannelOptions* opt) {
205- if (opt->has_ssl_options ()) {
206- LOG (WARNING) << " Cannot use SSL and RDMA at the same time" ;
207- return false ;
208- }
209- if (!rdma::SupportedByRdma (opt->protocol .name ())) {
210- LOG (WARNING) << " Cannot use " << opt->protocol .name ()
211- << " over RDMA" ;
212- return false ;
213- }
214- return true ;
174+ void RdmaTransport::Debug (std::ostream &os) {
175+ if (_rdma_state == RDMA_ON && _rdma_ep) {
176+ _rdma_ep->DebugInfo (os);
215177 }
178+ }
216179
217- bool RdmaTransport::OptionsAvailableOverRdma ( const ServerOptions* opt ) {
218- if (opt-> rtmp_service ) {
219- LOG (WARNING) << " RTMP is not supported by RDMA " ;
220- return false ;
180+ int RdmaTransport::ContextInitOrDie ( bool serverOrNot, const void * _options ) {
181+ if (serverOrNot ) {
182+ if (! OptionsAvailableOverRdma ( static_cast < const ServerOptions *>(_options))) {
183+ return - 1 ;
221184 }
222- if (opt-> has_ssl_options ()) {
223- LOG (WARNING) << " SSL is not supported by RDMA " ;
224- return false ;
185+ rdma::GlobalRdmaInitializeOrDie ();
186+ if (! rdma::InitPollingModeWithTag ( static_cast < const ServerOptions *>(_options)-> bthread_tag )) {
187+ return - 1 ;
225188 }
226- if (opt-> nshead_service ) {
227- LOG (WARNING) << " NSHEAD is not supported by RDMA " ;
228- return false ;
189+ } else {
190+ if (! OptionsAvailableForRdma ( static_cast < const ChannelOptions *>(_options))) {
191+ return - 1 ;
229192 }
230- if (opt-> mongo_service_adaptor ) {
231- LOG (WARNING) << " MONGO is not supported by RDMA " ;
232- return false ;
193+ rdma::GlobalRdmaInitializeOrDie ();
194+ if (! rdma::InitPollingModeWithTag ( bthread_self_tag ())) {
195+ return - 1 ;
233196 }
234- return true ;
197+ return 0 ;
235198 }
199+
200+ return 0 ;
201+ }
202+
203+ bool RdmaTransport::OptionsAvailableForRdma (const ChannelOptions* opt) {
204+ if (opt->has_ssl_options ()) {
205+ LOG (WARNING) << " Cannot use SSL and RDMA at the same time" ;
206+ return false ;
207+ }
208+ if (!rdma::SupportedByRdma (opt->protocol .name ())) {
209+ LOG (WARNING) << " Cannot use " << opt->protocol .name ()
210+ << " over RDMA" ;
211+ return false ;
212+ }
213+ return true ;
214+ }
215+
216+ bool RdmaTransport::OptionsAvailableOverRdma (const ServerOptions* opt) {
217+ if (opt->rtmp_service ) {
218+ LOG (WARNING) << " RTMP is not supported by RDMA" ;
219+ return false ;
220+ }
221+ if (opt->has_ssl_options ()) {
222+ LOG (WARNING) << " SSL is not supported by RDMA" ;
223+ return false ;
224+ }
225+ if (opt->nshead_service ) {
226+ LOG (WARNING) << " NSHEAD is not supported by RDMA" ;
227+ return false ;
228+ }
229+ if (opt->mongo_service_adaptor ) {
230+ LOG (WARNING) << " MONGO is not supported by RDMA" ;
231+ return false ;
232+ }
233+ return true ;
234+ }
236235}
237236#endif
0 commit comments