1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include <algorithm> 20 #include <forward_list> 21 #include <functional> 22 #include <memory> 23 #include <mutex> 24 #include <thread> 25 26 #include <grpc/grpc.h> 27 #include <grpc/support/alloc.h> 28 #include <grpc/support/log.h> 29 #include <grpcpp/generic/async_generic_service.h> 30 #include <grpcpp/resource_quota.h> 31 #include <grpcpp/security/server_credentials.h> 32 #include <grpcpp/server.h> 33 #include <grpcpp/server_builder.h> 34 #include <grpcpp/server_context.h> 35 #include <grpcpp/support/config.h> 36 37 #include "src/core/lib/gpr/host_port.h" 38 #include "src/core/lib/surface/completion_queue.h" 39 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h" 40 #include "test/core/util/test_config.h" 41 #include "test/cpp/qps/qps_server_builder.h" 42 #include "test/cpp/qps/server.h" 43 44 namespace grpc { 45 namespace testing { 46 47 template <class RequestType, class ResponseType, class ServiceType, 48 class ServerContextType> 49 class AsyncQpsServerTest final : public grpc::testing::Server { 50 public: 51 AsyncQpsServerTest( 52 const ServerConfig& config, 53 std::function<void(ServerBuilder*, ServiceType*)> register_service, 54 std::function<void(ServiceType*, ServerContextType*, RequestType*, 55 ServerAsyncResponseWriter<ResponseType>*, 56 CompletionQueue*, ServerCompletionQueue*, void*)> 57 request_unary_function, 58 std::function<void(ServiceType*, ServerContextType*, 59 ServerAsyncReaderWriter<ResponseType, RequestType>*, 60 CompletionQueue*, ServerCompletionQueue*, void*)> 61 request_streaming_function, 62 std::function<void(ServiceType*, ServerContextType*, 63 ServerAsyncReader<ResponseType, RequestType>*, 64 CompletionQueue*, ServerCompletionQueue*, void*)> 65 request_streaming_from_client_function, 66 std::function<void(ServiceType*, ServerContextType*, RequestType*, 67 ServerAsyncWriter<ResponseType>*, CompletionQueue*, 68 ServerCompletionQueue*, void*)> 69 request_streaming_from_server_function, 70 std::function<void(ServiceType*, ServerContextType*, 71 ServerAsyncReaderWriter<ResponseType, RequestType>*, 72 CompletionQueue*, ServerCompletionQueue*, void*)> 73 request_streaming_both_ways_function, 74 std::function<grpc::Status(const PayloadConfig&, RequestType*, 75 ResponseType*)> 76 process_rpc) 77 : Server(config) { 78 std::unique_ptr<ServerBuilder> builder = CreateQpsServerBuilder(); 79 80 auto port_num = port(); 81 // Negative port number means inproc server, so no listen port needed 82 if (port_num >= 0) { 83 char* server_address = nullptr; 84 gpr_join_host_port(&server_address, "::", port_num); 85 builder->AddListeningPort(server_address, 86 Server::CreateServerCredentials(config)); 87 gpr_free(server_address); 88 } 89 90 register_service(builder.get(), &async_service_); 91 92 int num_threads = config.async_server_threads(); 93 if (num_threads <= 0) { // dynamic sizing 94 num_threads = cores(); 95 gpr_log(GPR_INFO, "Sizing async server to %d threads", num_threads); 96 } 97 98 int tpc = std::max(1, config.threads_per_cq()); // 1 if unspecified 99 int num_cqs = (num_threads + tpc - 1) / tpc; // ceiling operator 100 for (int i = 0; i < num_cqs; i++) { 101 srv_cqs_.emplace_back(builder->AddCompletionQueue()); 102 } 103 for (int i = 0; i < num_threads; i++) { 104 cq_.emplace_back(i % srv_cqs_.size()); 105 } 106 107 ApplyConfigToBuilder(config, builder.get()); 108 109 server_ = builder->BuildAndStart(); 110 111 auto process_rpc_bound = 112 std::bind(process_rpc, config.payload_config(), std::placeholders::_1, 113 std::placeholders::_2); 114 115 for (int i = 0; i < 5000; i++) { 116 for (int j = 0; j < num_cqs; j++) { 117 if (request_unary_function) { 118 auto request_unary = std::bind( 119 request_unary_function, &async_service_, std::placeholders::_1, 120 std::placeholders::_2, std::placeholders::_3, srv_cqs_[j].get(), 121 srv_cqs_[j].get(), std::placeholders::_4); 122 contexts_.emplace_back( 123 new ServerRpcContextUnaryImpl(request_unary, process_rpc_bound)); 124 } 125 if (request_streaming_function) { 126 auto request_streaming = std::bind( 127 request_streaming_function, &async_service_, 128 std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(), 129 srv_cqs_[j].get(), std::placeholders::_3); 130 contexts_.emplace_back(new ServerRpcContextStreamingImpl( 131 request_streaming, process_rpc_bound)); 132 } 133 if (request_streaming_from_client_function) { 134 auto request_streaming_from_client = std::bind( 135 request_streaming_from_client_function, &async_service_, 136 std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(), 137 srv_cqs_[j].get(), std::placeholders::_3); 138 contexts_.emplace_back(new ServerRpcContextStreamingFromClientImpl( 139 request_streaming_from_client, process_rpc_bound)); 140 } 141 if (request_streaming_from_server_function) { 142 auto request_streaming_from_server = 143 std::bind(request_streaming_from_server_function, &async_service_, 144 std::placeholders::_1, std::placeholders::_2, 145 std::placeholders::_3, srv_cqs_[j].get(), 146 srv_cqs_[j].get(), std::placeholders::_4); 147 contexts_.emplace_back(new ServerRpcContextStreamingFromServerImpl( 148 request_streaming_from_server, process_rpc_bound)); 149 } 150 if (request_streaming_both_ways_function) { 151 // TODO(vjpai): Add this code 152 } 153 } 154 } 155 156 for (int i = 0; i < num_threads; i++) { 157 shutdown_state_.emplace_back(new PerThreadShutdownState()); 158 threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i); 159 } 160 } 161 ~AsyncQpsServerTest() { 162 for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { 163 std::lock_guard<std::mutex> lock((*ss)->mutex); 164 (*ss)->shutdown = true; 165 } 166 std::thread shutdown_thread(&AsyncQpsServerTest::ShutdownThreadFunc, this); 167 for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) { 168 (*cq)->Shutdown(); 169 } 170 for (auto thr = threads_.begin(); thr != threads_.end(); thr++) { 171 thr->join(); 172 } 173 for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) { 174 bool ok; 175 void* got_tag; 176 while ((*cq)->Next(&got_tag, &ok)) 177 ; 178 } 179 shutdown_thread.join(); 180 } 181 182 int GetPollCount() override { 183 int count = 0; 184 for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); cq++) { 185 count += grpc_get_cq_poll_num((*cq)->cq()); 186 } 187 return count; 188 } 189 190 std::shared_ptr<Channel> InProcessChannel( 191 const ChannelArguments& args) override { 192 return server_->InProcessChannel(args); 193 } 194 195 private: 196 void ShutdownThreadFunc() { 197 // TODO (vpai): Remove this deadline and allow Shutdown to finish properly 198 auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(3); 199 server_->Shutdown(deadline); 200 } 201 202 void ThreadFunc(int thread_idx) { 203 // Wait until work is available or we are shutting down 204 bool ok; 205 void* got_tag; 206 if (!srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { 207 return; 208 } 209 ServerRpcContext* ctx; 210 std::mutex* mu_ptr = &shutdown_state_[thread_idx]->mutex; 211 do { 212 ctx = detag(got_tag); 213 // The tag is a pointer to an RPC context to invoke 214 // Proceed while holding a lock to make sure that 215 // this thread isn't supposed to shut down 216 mu_ptr->lock(); 217 if (shutdown_state_[thread_idx]->shutdown) { 218 mu_ptr->unlock(); 219 return; 220 } 221 } while (srv_cqs_[cq_[thread_idx]]->DoThenAsyncNext( 222 [&, ctx, ok, mu_ptr]() { 223 ctx->lock(); 224 if (!ctx->RunNextState(ok)) { 225 ctx->Reset(); 226 } 227 ctx->unlock(); 228 mu_ptr->unlock(); 229 }, 230 &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))); 231 } 232 233 class ServerRpcContext { 234 public: 235 ServerRpcContext() {} 236 void lock() { mu_.lock(); } 237 void unlock() { mu_.unlock(); } 238 virtual ~ServerRpcContext(){}; 239 virtual bool RunNextState(bool) = 0; // next state, return false if done 240 virtual void Reset() = 0; // start this back at a clean state 241 private: 242 std::mutex mu_; 243 }; 244 static void* tag(ServerRpcContext* func) { return static_cast<void*>(func); } 245 static ServerRpcContext* detag(void* tag) { 246 return static_cast<ServerRpcContext*>(tag); 247 } 248 249 class ServerRpcContextUnaryImpl final : public ServerRpcContext { 250 public: 251 ServerRpcContextUnaryImpl( 252 std::function<void(ServerContextType*, RequestType*, 253 grpc::ServerAsyncResponseWriter<ResponseType>*, 254 void*)> 255 request_method, 256 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method) 257 : srv_ctx_(new ServerContextType), 258 next_state_(&ServerRpcContextUnaryImpl::invoker), 259 request_method_(request_method), 260 invoke_method_(invoke_method), 261 response_writer_(srv_ctx_.get()) { 262 request_method_(srv_ctx_.get(), &req_, &response_writer_, 263 AsyncQpsServerTest::tag(this)); 264 } 265 ~ServerRpcContextUnaryImpl() override {} 266 bool RunNextState(bool ok) override { return (this->*next_state_)(ok); } 267 void Reset() override { 268 srv_ctx_.reset(new ServerContextType); 269 req_ = RequestType(); 270 response_writer_ = 271 grpc::ServerAsyncResponseWriter<ResponseType>(srv_ctx_.get()); 272 273 // Then request the method 274 next_state_ = &ServerRpcContextUnaryImpl::invoker; 275 request_method_(srv_ctx_.get(), &req_, &response_writer_, 276 AsyncQpsServerTest::tag(this)); 277 } 278 279 private: 280 bool finisher(bool) { return false; } 281 bool invoker(bool ok) { 282 if (!ok) { 283 return false; 284 } 285 286 // Call the RPC processing function 287 grpc::Status status = invoke_method_(&req_, &response_); 288 289 // Have the response writer work and invoke on_finish when done 290 next_state_ = &ServerRpcContextUnaryImpl::finisher; 291 response_writer_.Finish(response_, status, AsyncQpsServerTest::tag(this)); 292 return true; 293 } 294 std::unique_ptr<ServerContextType> srv_ctx_; 295 RequestType req_; 296 ResponseType response_; 297 bool (ServerRpcContextUnaryImpl::*next_state_)(bool); 298 std::function<void(ServerContextType*, RequestType*, 299 grpc::ServerAsyncResponseWriter<ResponseType>*, void*)> 300 request_method_; 301 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_; 302 grpc::ServerAsyncResponseWriter<ResponseType> response_writer_; 303 }; 304 305 class ServerRpcContextStreamingImpl final : public ServerRpcContext { 306 public: 307 ServerRpcContextStreamingImpl( 308 std::function<void( 309 ServerContextType*, 310 grpc::ServerAsyncReaderWriter<ResponseType, RequestType>*, void*)> 311 request_method, 312 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method) 313 : srv_ctx_(new ServerContextType), 314 next_state_(&ServerRpcContextStreamingImpl::request_done), 315 request_method_(request_method), 316 invoke_method_(invoke_method), 317 stream_(srv_ctx_.get()) { 318 request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this)); 319 } 320 ~ServerRpcContextStreamingImpl() override {} 321 bool RunNextState(bool ok) override { return (this->*next_state_)(ok); } 322 void Reset() override { 323 srv_ctx_.reset(new ServerContextType); 324 req_ = RequestType(); 325 stream_ = grpc::ServerAsyncReaderWriter<ResponseType, RequestType>( 326 srv_ctx_.get()); 327 328 // Then request the method 329 next_state_ = &ServerRpcContextStreamingImpl::request_done; 330 request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this)); 331 } 332 333 private: 334 bool request_done(bool ok) { 335 if (!ok) { 336 return false; 337 } 338 next_state_ = &ServerRpcContextStreamingImpl::read_done; 339 stream_.Read(&req_, AsyncQpsServerTest::tag(this)); 340 return true; 341 } 342 343 bool read_done(bool ok) { 344 if (ok) { 345 // invoke the method 346 // Call the RPC processing function 347 grpc::Status status = invoke_method_(&req_, &response_); 348 // initiate the write 349 next_state_ = &ServerRpcContextStreamingImpl::write_done; 350 stream_.Write(response_, AsyncQpsServerTest::tag(this)); 351 } else { // client has sent writes done 352 // finish the stream 353 next_state_ = &ServerRpcContextStreamingImpl::finish_done; 354 stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this)); 355 } 356 return true; 357 } 358 bool write_done(bool ok) { 359 // now go back and get another streaming read! 360 if (ok) { 361 next_state_ = &ServerRpcContextStreamingImpl::read_done; 362 stream_.Read(&req_, AsyncQpsServerTest::tag(this)); 363 } else { 364 next_state_ = &ServerRpcContextStreamingImpl::finish_done; 365 stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this)); 366 } 367 return true; 368 } 369 bool finish_done(bool ok) { return false; /* reset the context */ } 370 371 std::unique_ptr<ServerContextType> srv_ctx_; 372 RequestType req_; 373 ResponseType response_; 374 bool (ServerRpcContextStreamingImpl::*next_state_)(bool); 375 std::function<void( 376 ServerContextType*, 377 grpc::ServerAsyncReaderWriter<ResponseType, RequestType>*, void*)> 378 request_method_; 379 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_; 380 grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_; 381 }; 382 383 class ServerRpcContextStreamingFromClientImpl final 384 : public ServerRpcContext { 385 public: 386 ServerRpcContextStreamingFromClientImpl( 387 std::function<void(ServerContextType*, 388 grpc::ServerAsyncReader<ResponseType, RequestType>*, 389 void*)> 390 request_method, 391 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method) 392 : srv_ctx_(new ServerContextType), 393 next_state_(&ServerRpcContextStreamingFromClientImpl::request_done), 394 request_method_(request_method), 395 invoke_method_(invoke_method), 396 stream_(srv_ctx_.get()) { 397 request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this)); 398 } 399 ~ServerRpcContextStreamingFromClientImpl() override {} 400 bool RunNextState(bool ok) override { return (this->*next_state_)(ok); } 401 void Reset() override { 402 srv_ctx_.reset(new ServerContextType); 403 req_ = RequestType(); 404 stream_ = 405 grpc::ServerAsyncReader<ResponseType, RequestType>(srv_ctx_.get()); 406 407 // Then request the method 408 next_state_ = &ServerRpcContextStreamingFromClientImpl::request_done; 409 request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this)); 410 } 411 412 private: 413 bool request_done(bool ok) { 414 if (!ok) { 415 return false; 416 } 417 next_state_ = &ServerRpcContextStreamingFromClientImpl::read_done; 418 stream_.Read(&req_, AsyncQpsServerTest::tag(this)); 419 return true; 420 } 421 422 bool read_done(bool ok) { 423 if (ok) { 424 // In this case, just do another read 425 // next_state_ is unchanged 426 stream_.Read(&req_, AsyncQpsServerTest::tag(this)); 427 return true; 428 } else { // client has sent writes done 429 // invoke the method 430 // Call the RPC processing function 431 grpc::Status status = invoke_method_(&req_, &response_); 432 // finish the stream 433 next_state_ = &ServerRpcContextStreamingFromClientImpl::finish_done; 434 stream_.Finish(response_, Status::OK, AsyncQpsServerTest::tag(this)); 435 } 436 return true; 437 } 438 bool finish_done(bool ok) { return false; /* reset the context */ } 439 440 std::unique_ptr<ServerContextType> srv_ctx_; 441 RequestType req_; 442 ResponseType response_; 443 bool (ServerRpcContextStreamingFromClientImpl::*next_state_)(bool); 444 std::function<void(ServerContextType*, 445 grpc::ServerAsyncReader<ResponseType, RequestType>*, 446 void*)> 447 request_method_; 448 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_; 449 grpc::ServerAsyncReader<ResponseType, RequestType> stream_; 450 }; 451 452 class ServerRpcContextStreamingFromServerImpl final 453 : public ServerRpcContext { 454 public: 455 ServerRpcContextStreamingFromServerImpl( 456 std::function<void(ServerContextType*, RequestType*, 457 grpc::ServerAsyncWriter<ResponseType>*, void*)> 458 request_method, 459 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method) 460 : srv_ctx_(new ServerContextType), 461 next_state_(&ServerRpcContextStreamingFromServerImpl::request_done), 462 request_method_(request_method), 463 invoke_method_(invoke_method), 464 stream_(srv_ctx_.get()) { 465 request_method_(srv_ctx_.get(), &req_, &stream_, 466 AsyncQpsServerTest::tag(this)); 467 } 468 ~ServerRpcContextStreamingFromServerImpl() override {} 469 bool RunNextState(bool ok) override { return (this->*next_state_)(ok); } 470 void Reset() override { 471 srv_ctx_.reset(new ServerContextType); 472 req_ = RequestType(); 473 stream_ = grpc::ServerAsyncWriter<ResponseType>(srv_ctx_.get()); 474 475 // Then request the method 476 next_state_ = &ServerRpcContextStreamingFromServerImpl::request_done; 477 request_method_(srv_ctx_.get(), &req_, &stream_, 478 AsyncQpsServerTest::tag(this)); 479 } 480 481 private: 482 bool request_done(bool ok) { 483 if (!ok) { 484 return false; 485 } 486 // invoke the method 487 // Call the RPC processing function 488 grpc::Status status = invoke_method_(&req_, &response_); 489 490 next_state_ = &ServerRpcContextStreamingFromServerImpl::write_done; 491 stream_.Write(response_, AsyncQpsServerTest::tag(this)); 492 return true; 493 } 494 495 bool write_done(bool ok) { 496 if (ok) { 497 // Do another write! 498 // next_state_ is unchanged 499 stream_.Write(response_, AsyncQpsServerTest::tag(this)); 500 } else { // must be done so let's finish 501 next_state_ = &ServerRpcContextStreamingFromServerImpl::finish_done; 502 stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this)); 503 } 504 return true; 505 } 506 bool finish_done(bool ok) { return false; /* reset the context */ } 507 508 std::unique_ptr<ServerContextType> srv_ctx_; 509 RequestType req_; 510 ResponseType response_; 511 bool (ServerRpcContextStreamingFromServerImpl::*next_state_)(bool); 512 std::function<void(ServerContextType*, RequestType*, 513 grpc::ServerAsyncWriter<ResponseType>*, void*)> 514 request_method_; 515 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_; 516 grpc::ServerAsyncWriter<ResponseType> stream_; 517 }; 518 519 std::vector<std::thread> threads_; 520 std::unique_ptr<grpc::Server> server_; 521 std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_; 522 std::vector<int> cq_; 523 ServiceType async_service_; 524 std::vector<std::unique_ptr<ServerRpcContext>> contexts_; 525 526 struct PerThreadShutdownState { 527 mutable std::mutex mutex; 528 bool shutdown; 529 PerThreadShutdownState() : shutdown(false) {} 530 }; 531 532 std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_; 533 }; 534 535 static void RegisterBenchmarkService(ServerBuilder* builder, 536 BenchmarkService::AsyncService* service) { 537 builder->RegisterService(service); 538 } 539 static void RegisterGenericService(ServerBuilder* builder, 540 grpc::AsyncGenericService* service) { 541 builder->RegisterAsyncGenericService(service); 542 } 543 544 static Status ProcessSimpleRPC(const PayloadConfig&, SimpleRequest* request, 545 SimpleResponse* response) { 546 if (request->response_size() > 0) { 547 if (!Server::SetPayload(request->response_type(), request->response_size(), 548 response->mutable_payload())) { 549 return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); 550 } 551 } 552 // We are done using the request. Clear it to reduce working memory. 553 // This proves to reduce cache misses in large message size cases. 554 request->Clear(); 555 return Status::OK; 556 } 557 558 static Status ProcessGenericRPC(const PayloadConfig& payload_config, 559 ByteBuffer* request, ByteBuffer* response) { 560 // We are done using the request. Clear it to reduce working memory. 561 // This proves to reduce cache misses in large message size cases. 562 request->Clear(); 563 int resp_size = payload_config.bytebuf_params().resp_size(); 564 std::unique_ptr<char[]> buf(new char[resp_size]); 565 Slice slice(buf.get(), resp_size); 566 *response = ByteBuffer(&slice, 1); 567 return Status::OK; 568 } 569 570 std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config) { 571 return std::unique_ptr<Server>( 572 new AsyncQpsServerTest<SimpleRequest, SimpleResponse, 573 BenchmarkService::AsyncService, 574 grpc::ServerContext>( 575 config, RegisterBenchmarkService, 576 &BenchmarkService::AsyncService::RequestUnaryCall, 577 &BenchmarkService::AsyncService::RequestStreamingCall, 578 &BenchmarkService::AsyncService::RequestStreamingFromClient, 579 &BenchmarkService::AsyncService::RequestStreamingFromServer, 580 &BenchmarkService::AsyncService::RequestStreamingBothWays, 581 ProcessSimpleRPC)); 582 } 583 std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig& config) { 584 return std::unique_ptr<Server>( 585 new AsyncQpsServerTest<ByteBuffer, ByteBuffer, grpc::AsyncGenericService, 586 grpc::GenericServerContext>( 587 config, RegisterGenericService, nullptr, 588 &grpc::AsyncGenericService::RequestCall, nullptr, nullptr, nullptr, 589 ProcessGenericRPC)); 590 } 591 592 } // namespace testing 593 } // namespace grpc 594