method_handler.h 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. /*
  2. *
  3. * Copyright 2015 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. #ifndef GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_H
  19. #define GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_H
  20. // IWYU pragma: private, include <grpcpp/support/method_handler.h>
  21. #include <grpcpp/impl/codegen/byte_buffer.h>
  22. #include <grpcpp/impl/codegen/core_codegen_interface.h>
  23. #include <grpcpp/impl/codegen/rpc_service_method.h>
  24. #include <grpcpp/impl/codegen/sync_stream.h>
  25. namespace grpc {
  26. namespace internal {
  27. // Invoke the method handler, fill in the status, and
  28. // return whether or not we finished safely (without an exception).
  29. // Note that exception handling is 0-cost in most compiler/library
  30. // implementations (except when an exception is actually thrown),
  31. // so this process doesn't require additional overhead in the common case.
  32. // Additionally, we don't need to return if we caught an exception or not;
  33. // the handling is the same in either case.
  34. template <class Callable>
  35. ::grpc::Status CatchingFunctionHandler(Callable&& handler) {
  36. #if GRPC_ALLOW_EXCEPTIONS
  37. try {
  38. return handler();
  39. } catch (...) {
  40. return ::grpc::Status(::grpc::StatusCode::UNKNOWN,
  41. "Unexpected error in RPC handling");
  42. }
  43. #else // GRPC_ALLOW_EXCEPTIONS
  44. return handler();
  45. #endif // GRPC_ALLOW_EXCEPTIONS
  46. }
  47. /// A helper function with reduced templating to do the common work needed to
  48. /// actually send the server response. Uses non-const parameter for Status since
  49. /// this should only ever be called from the end of the RunHandler method.
  50. template <class ResponseType>
  51. void UnaryRunHandlerHelper(const MethodHandler::HandlerParameter& param,
  52. ResponseType* rsp, ::grpc::Status& status) {
  53. GPR_CODEGEN_ASSERT(!param.server_context->sent_initial_metadata_);
  54. ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
  55. ::grpc::internal::CallOpSendMessage,
  56. ::grpc::internal::CallOpServerSendStatus>
  57. ops;
  58. ops.SendInitialMetadata(&param.server_context->initial_metadata_,
  59. param.server_context->initial_metadata_flags());
  60. if (param.server_context->compression_level_set()) {
  61. ops.set_compression_level(param.server_context->compression_level());
  62. }
  63. if (status.ok()) {
  64. status = ops.SendMessagePtr(rsp);
  65. }
  66. ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
  67. param.call->PerformOps(&ops);
  68. param.call->cq()->Pluck(&ops);
  69. }
  70. /// A helper function with reduced templating to do deserializing.
  71. template <class RequestType>
  72. void* UnaryDeserializeHelper(grpc_byte_buffer* req, ::grpc::Status* status,
  73. RequestType* request) {
  74. ::grpc::ByteBuffer buf;
  75. buf.set_buffer(req);
  76. *status = ::grpc::SerializationTraits<RequestType>::Deserialize(
  77. &buf, static_cast<RequestType*>(request));
  78. buf.Release();
  79. if (status->ok()) {
  80. return request;
  81. }
  82. request->~RequestType();
  83. return nullptr;
  84. }
  85. /// A wrapper class of an application provided rpc method handler.
  86. template <class ServiceType, class RequestType, class ResponseType,
  87. class BaseRequestType = RequestType,
  88. class BaseResponseType = ResponseType>
  89. class RpcMethodHandler : public ::grpc::internal::MethodHandler {
  90. public:
  91. RpcMethodHandler(
  92. std::function<::grpc::Status(ServiceType*, ::grpc::ServerContext*,
  93. const RequestType*, ResponseType*)>
  94. func,
  95. ServiceType* service)
  96. : func_(func), service_(service) {}
  97. void RunHandler(const HandlerParameter& param) final {
  98. ResponseType rsp;
  99. ::grpc::Status status = param.status;
  100. if (status.ok()) {
  101. status = CatchingFunctionHandler([this, &param, &rsp] {
  102. return func_(service_,
  103. static_cast<::grpc::ServerContext*>(param.server_context),
  104. static_cast<RequestType*>(param.request), &rsp);
  105. });
  106. static_cast<RequestType*>(param.request)->~RequestType();
  107. }
  108. UnaryRunHandlerHelper(param, static_cast<BaseResponseType*>(&rsp), status);
  109. }
  110. void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
  111. ::grpc::Status* status, void** /*handler_data*/) final {
  112. auto* request =
  113. new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
  114. call, sizeof(RequestType))) RequestType;
  115. return UnaryDeserializeHelper(req, status,
  116. static_cast<BaseRequestType*>(request));
  117. }
  118. private:
  119. /// Application provided rpc handler function.
  120. std::function<::grpc::Status(ServiceType*, ::grpc::ServerContext*,
  121. const RequestType*, ResponseType*)>
  122. func_;
  123. // The class the above handler function lives in.
  124. ServiceType* service_;
  125. };
  126. /// A wrapper class of an application provided client streaming handler.
  127. template <class ServiceType, class RequestType, class ResponseType>
  128. class ClientStreamingHandler : public ::grpc::internal::MethodHandler {
  129. public:
  130. ClientStreamingHandler(
  131. std::function<::grpc::Status(ServiceType*, ::grpc::ServerContext*,
  132. ServerReader<RequestType>*, ResponseType*)>
  133. func,
  134. ServiceType* service)
  135. : func_(func), service_(service) {}
  136. void RunHandler(const HandlerParameter& param) final {
  137. ServerReader<RequestType> reader(
  138. param.call, static_cast<::grpc::ServerContext*>(param.server_context));
  139. ResponseType rsp;
  140. ::grpc::Status status = CatchingFunctionHandler([this, &param, &reader,
  141. &rsp] {
  142. return func_(service_,
  143. static_cast<::grpc::ServerContext*>(param.server_context),
  144. &reader, &rsp);
  145. });
  146. ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
  147. ::grpc::internal::CallOpSendMessage,
  148. ::grpc::internal::CallOpServerSendStatus>
  149. ops;
  150. if (!param.server_context->sent_initial_metadata_) {
  151. ops.SendInitialMetadata(&param.server_context->initial_metadata_,
  152. param.server_context->initial_metadata_flags());
  153. if (param.server_context->compression_level_set()) {
  154. ops.set_compression_level(param.server_context->compression_level());
  155. }
  156. }
  157. if (status.ok()) {
  158. status = ops.SendMessagePtr(&rsp);
  159. }
  160. ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
  161. param.call->PerformOps(&ops);
  162. param.call->cq()->Pluck(&ops);
  163. }
  164. private:
  165. std::function<::grpc::Status(ServiceType*, ::grpc::ServerContext*,
  166. ServerReader<RequestType>*, ResponseType*)>
  167. func_;
  168. ServiceType* service_;
  169. };
  170. /// A wrapper class of an application provided server streaming handler.
  171. template <class ServiceType, class RequestType, class ResponseType>
  172. class ServerStreamingHandler : public ::grpc::internal::MethodHandler {
  173. public:
  174. ServerStreamingHandler(std::function<::grpc::Status(
  175. ServiceType*, ::grpc::ServerContext*,
  176. const RequestType*, ServerWriter<ResponseType>*)>
  177. func,
  178. ServiceType* service)
  179. : func_(func), service_(service) {}
  180. void RunHandler(const HandlerParameter& param) final {
  181. ::grpc::Status status = param.status;
  182. if (status.ok()) {
  183. ServerWriter<ResponseType> writer(
  184. param.call,
  185. static_cast<::grpc::ServerContext*>(param.server_context));
  186. status = CatchingFunctionHandler([this, &param, &writer] {
  187. return func_(service_,
  188. static_cast<::grpc::ServerContext*>(param.server_context),
  189. static_cast<RequestType*>(param.request), &writer);
  190. });
  191. static_cast<RequestType*>(param.request)->~RequestType();
  192. }
  193. ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
  194. ::grpc::internal::CallOpServerSendStatus>
  195. ops;
  196. if (!param.server_context->sent_initial_metadata_) {
  197. ops.SendInitialMetadata(&param.server_context->initial_metadata_,
  198. param.server_context->initial_metadata_flags());
  199. if (param.server_context->compression_level_set()) {
  200. ops.set_compression_level(param.server_context->compression_level());
  201. }
  202. }
  203. ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
  204. param.call->PerformOps(&ops);
  205. if (param.server_context->has_pending_ops_) {
  206. param.call->cq()->Pluck(&param.server_context->pending_ops_);
  207. }
  208. param.call->cq()->Pluck(&ops);
  209. }
  210. void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
  211. ::grpc::Status* status, void** /*handler_data*/) final {
  212. ::grpc::ByteBuffer buf;
  213. buf.set_buffer(req);
  214. auto* request =
  215. new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
  216. call, sizeof(RequestType))) RequestType();
  217. *status =
  218. ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
  219. buf.Release();
  220. if (status->ok()) {
  221. return request;
  222. }
  223. request->~RequestType();
  224. return nullptr;
  225. }
  226. private:
  227. std::function<::grpc::Status(ServiceType*, ::grpc::ServerContext*,
  228. const RequestType*, ServerWriter<ResponseType>*)>
  229. func_;
  230. ServiceType* service_;
  231. };
  232. /// A wrapper class of an application provided bidi-streaming handler.
  233. /// This also applies to server-streamed implementation of a unary method
  234. /// with the additional requirement that such methods must have done a
  235. /// write for status to be ok
  236. /// Since this is used by more than 1 class, the service is not passed in.
  237. /// Instead, it is expected to be an implicitly-captured argument of func
  238. /// (through bind or something along those lines)
  239. template <class Streamer, bool WriteNeeded>
  240. class TemplatedBidiStreamingHandler : public ::grpc::internal::MethodHandler {
  241. public:
  242. explicit TemplatedBidiStreamingHandler(
  243. std::function<::grpc::Status(::grpc::ServerContext*, Streamer*)> func)
  244. : func_(func), write_needed_(WriteNeeded) {}
  245. void RunHandler(const HandlerParameter& param) final {
  246. Streamer stream(param.call,
  247. static_cast<::grpc::ServerContext*>(param.server_context));
  248. ::grpc::Status status = CatchingFunctionHandler([this, &param, &stream] {
  249. return func_(static_cast<::grpc::ServerContext*>(param.server_context),
  250. &stream);
  251. });
  252. ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
  253. ::grpc::internal::CallOpServerSendStatus>
  254. ops;
  255. if (!param.server_context->sent_initial_metadata_) {
  256. ops.SendInitialMetadata(&param.server_context->initial_metadata_,
  257. param.server_context->initial_metadata_flags());
  258. if (param.server_context->compression_level_set()) {
  259. ops.set_compression_level(param.server_context->compression_level());
  260. }
  261. if (write_needed_ && status.ok()) {
  262. // If we needed a write but never did one, we need to mark the
  263. // status as a fail
  264. status = ::grpc::Status(::grpc::StatusCode::INTERNAL,
  265. "Service did not provide response message");
  266. }
  267. }
  268. ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
  269. param.call->PerformOps(&ops);
  270. if (param.server_context->has_pending_ops_) {
  271. param.call->cq()->Pluck(&param.server_context->pending_ops_);
  272. }
  273. param.call->cq()->Pluck(&ops);
  274. }
  275. private:
  276. std::function<::grpc::Status(::grpc::ServerContext*, Streamer*)> func_;
  277. const bool write_needed_;
  278. };
  279. template <class ServiceType, class RequestType, class ResponseType>
  280. class BidiStreamingHandler
  281. : public TemplatedBidiStreamingHandler<
  282. ServerReaderWriter<ResponseType, RequestType>, false> {
  283. public:
  284. BidiStreamingHandler(std::function<::grpc::Status(
  285. ServiceType*, ::grpc::ServerContext*,
  286. ServerReaderWriter<ResponseType, RequestType>*)>
  287. func,
  288. ServiceType* service)
  289. // TODO(vjpai): When gRPC supports C++14, move-capture func in the below
  290. : TemplatedBidiStreamingHandler<
  291. ServerReaderWriter<ResponseType, RequestType>, false>(
  292. [func, service](
  293. ::grpc::ServerContext* ctx,
  294. ServerReaderWriter<ResponseType, RequestType>* streamer) {
  295. return func(service, ctx, streamer);
  296. }) {}
  297. };
  298. template <class RequestType, class ResponseType>
  299. class StreamedUnaryHandler
  300. : public TemplatedBidiStreamingHandler<
  301. ServerUnaryStreamer<RequestType, ResponseType>, true> {
  302. public:
  303. explicit StreamedUnaryHandler(
  304. std::function<
  305. ::grpc::Status(::grpc::ServerContext*,
  306. ServerUnaryStreamer<RequestType, ResponseType>*)>
  307. func)
  308. : TemplatedBidiStreamingHandler<
  309. ServerUnaryStreamer<RequestType, ResponseType>, true>(
  310. std::move(func)) {}
  311. };
  312. template <class RequestType, class ResponseType>
  313. class SplitServerStreamingHandler
  314. : public TemplatedBidiStreamingHandler<
  315. ServerSplitStreamer<RequestType, ResponseType>, false> {
  316. public:
  317. explicit SplitServerStreamingHandler(
  318. std::function<
  319. ::grpc::Status(::grpc::ServerContext*,
  320. ServerSplitStreamer<RequestType, ResponseType>*)>
  321. func)
  322. : TemplatedBidiStreamingHandler<
  323. ServerSplitStreamer<RequestType, ResponseType>, false>(
  324. std::move(func)) {}
  325. };
  326. /// General method handler class for errors that prevent real method use
  327. /// e.g., handle unknown method by returning UNIMPLEMENTED error.
  328. template <::grpc::StatusCode code>
  329. class ErrorMethodHandler : public ::grpc::internal::MethodHandler {
  330. public:
  331. explicit ErrorMethodHandler(const std::string& message) : message_(message) {}
  332. template <class T>
  333. static void FillOps(::grpc::ServerContextBase* context,
  334. const std::string& message, T* ops) {
  335. ::grpc::Status status(code, message);
  336. if (!context->sent_initial_metadata_) {
  337. ops->SendInitialMetadata(&context->initial_metadata_,
  338. context->initial_metadata_flags());
  339. if (context->compression_level_set()) {
  340. ops->set_compression_level(context->compression_level());
  341. }
  342. context->sent_initial_metadata_ = true;
  343. }
  344. ops->ServerSendStatus(&context->trailing_metadata_, status);
  345. }
  346. void RunHandler(const HandlerParameter& param) final {
  347. ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
  348. ::grpc::internal::CallOpServerSendStatus>
  349. ops;
  350. FillOps(param.server_context, message_, &ops);
  351. param.call->PerformOps(&ops);
  352. param.call->cq()->Pluck(&ops);
  353. }
  354. void* Deserialize(grpc_call* /*call*/, grpc_byte_buffer* req,
  355. ::grpc::Status* /*status*/, void** /*handler_data*/) final {
  356. // We have to destroy any request payload
  357. if (req != nullptr) {
  358. ::grpc::g_core_codegen_interface->grpc_byte_buffer_destroy(req);
  359. }
  360. return nullptr;
  361. }
  362. private:
  363. const std::string message_;
  364. };
  365. typedef ErrorMethodHandler<::grpc::StatusCode::UNIMPLEMENTED>
  366. UnknownMethodHandler;
  367. typedef ErrorMethodHandler<::grpc::StatusCode::RESOURCE_EXHAUSTED>
  368. ResourceExhaustedHandler;
  369. } // namespace internal
  370. } // namespace grpc
  371. #endif // GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_H