Home | History | Annotate | Download | only in qps
      1 /*
      2  *
      3  * Copyright 2015 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include <algorithm>
     20 #include <forward_list>
     21 #include <functional>
     22 #include <memory>
     23 #include <mutex>
     24 #include <thread>
     25 
     26 #include <grpc/grpc.h>
     27 #include <grpc/support/alloc.h>
     28 #include <grpc/support/log.h>
     29 #include <grpcpp/generic/async_generic_service.h>
     30 #include <grpcpp/resource_quota.h>
     31 #include <grpcpp/security/server_credentials.h>
     32 #include <grpcpp/server.h>
     33 #include <grpcpp/server_builder.h>
     34 #include <grpcpp/server_context.h>
     35 #include <grpcpp/support/config.h>
     36 
     37 #include "src/core/lib/gpr/host_port.h"
     38 #include "src/core/lib/surface/completion_queue.h"
     39 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
     40 #include "test/core/util/test_config.h"
     41 #include "test/cpp/qps/qps_server_builder.h"
     42 #include "test/cpp/qps/server.h"
     43 
     44 namespace grpc {
     45 namespace testing {
     46 
     47 template <class RequestType, class ResponseType, class ServiceType,
     48           class ServerContextType>
     49 class AsyncQpsServerTest final : public grpc::testing::Server {
     50  public:
     51   AsyncQpsServerTest(
     52       const ServerConfig& config,
     53       std::function<void(ServerBuilder*, ServiceType*)> register_service,
     54       std::function<void(ServiceType*, ServerContextType*, RequestType*,
     55                          ServerAsyncResponseWriter<ResponseType>*,
     56                          CompletionQueue*, ServerCompletionQueue*, void*)>
     57           request_unary_function,
     58       std::function<void(ServiceType*, ServerContextType*,
     59                          ServerAsyncReaderWriter<ResponseType, RequestType>*,
     60                          CompletionQueue*, ServerCompletionQueue*, void*)>
     61           request_streaming_function,
     62       std::function<void(ServiceType*, ServerContextType*,
     63                          ServerAsyncReader<ResponseType, RequestType>*,
     64                          CompletionQueue*, ServerCompletionQueue*, void*)>
     65           request_streaming_from_client_function,
     66       std::function<void(ServiceType*, ServerContextType*, RequestType*,
     67                          ServerAsyncWriter<ResponseType>*, CompletionQueue*,
     68                          ServerCompletionQueue*, void*)>
     69           request_streaming_from_server_function,
     70       std::function<void(ServiceType*, ServerContextType*,
     71                          ServerAsyncReaderWriter<ResponseType, RequestType>*,
     72                          CompletionQueue*, ServerCompletionQueue*, void*)>
     73           request_streaming_both_ways_function,
     74       std::function<grpc::Status(const PayloadConfig&, RequestType*,
     75                                  ResponseType*)>
     76           process_rpc)
     77       : Server(config) {
     78     std::unique_ptr<ServerBuilder> builder = CreateQpsServerBuilder();
     79 
     80     auto port_num = port();
     81     // Negative port number means inproc server, so no listen port needed
     82     if (port_num >= 0) {
     83       char* server_address = nullptr;
     84       gpr_join_host_port(&server_address, "::", port_num);
     85       builder->AddListeningPort(server_address,
     86                                 Server::CreateServerCredentials(config));
     87       gpr_free(server_address);
     88     }
     89 
     90     register_service(builder.get(), &async_service_);
     91 
     92     int num_threads = config.async_server_threads();
     93     if (num_threads <= 0) {  // dynamic sizing
     94       num_threads = cores();
     95       gpr_log(GPR_INFO, "Sizing async server to %d threads", num_threads);
     96     }
     97 
     98     int tpc = std::max(1, config.threads_per_cq());  // 1 if unspecified
     99     int num_cqs = (num_threads + tpc - 1) / tpc;     // ceiling operator
    100     for (int i = 0; i < num_cqs; i++) {
    101       srv_cqs_.emplace_back(builder->AddCompletionQueue());
    102     }
    103     for (int i = 0; i < num_threads; i++) {
    104       cq_.emplace_back(i % srv_cqs_.size());
    105     }
    106 
    107     ApplyConfigToBuilder(config, builder.get());
    108 
    109     server_ = builder->BuildAndStart();
    110 
    111     auto process_rpc_bound =
    112         std::bind(process_rpc, config.payload_config(), std::placeholders::_1,
    113                   std::placeholders::_2);
    114 
    115     for (int i = 0; i < 5000; i++) {
    116       for (int j = 0; j < num_cqs; j++) {
    117         if (request_unary_function) {
    118           auto request_unary = std::bind(
    119               request_unary_function, &async_service_, std::placeholders::_1,
    120               std::placeholders::_2, std::placeholders::_3, srv_cqs_[j].get(),
    121               srv_cqs_[j].get(), std::placeholders::_4);
    122           contexts_.emplace_back(
    123               new ServerRpcContextUnaryImpl(request_unary, process_rpc_bound));
    124         }
    125         if (request_streaming_function) {
    126           auto request_streaming = std::bind(
    127               request_streaming_function, &async_service_,
    128               std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(),
    129               srv_cqs_[j].get(), std::placeholders::_3);
    130           contexts_.emplace_back(new ServerRpcContextStreamingImpl(
    131               request_streaming, process_rpc_bound));
    132         }
    133         if (request_streaming_from_client_function) {
    134           auto request_streaming_from_client = std::bind(
    135               request_streaming_from_client_function, &async_service_,
    136               std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(),
    137               srv_cqs_[j].get(), std::placeholders::_3);
    138           contexts_.emplace_back(new ServerRpcContextStreamingFromClientImpl(
    139               request_streaming_from_client, process_rpc_bound));
    140         }
    141         if (request_streaming_from_server_function) {
    142           auto request_streaming_from_server =
    143               std::bind(request_streaming_from_server_function, &async_service_,
    144                         std::placeholders::_1, std::placeholders::_2,
    145                         std::placeholders::_3, srv_cqs_[j].get(),
    146                         srv_cqs_[j].get(), std::placeholders::_4);
    147           contexts_.emplace_back(new ServerRpcContextStreamingFromServerImpl(
    148               request_streaming_from_server, process_rpc_bound));
    149         }
    150         if (request_streaming_both_ways_function) {
    151           // TODO(vjpai): Add this code
    152         }
    153       }
    154     }
    155 
    156     for (int i = 0; i < num_threads; i++) {
    157       shutdown_state_.emplace_back(new PerThreadShutdownState());
    158       threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
    159     }
    160   }
    161   ~AsyncQpsServerTest() {
    162     for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
    163       std::lock_guard<std::mutex> lock((*ss)->mutex);
    164       (*ss)->shutdown = true;
    165     }
    166     std::thread shutdown_thread(&AsyncQpsServerTest::ShutdownThreadFunc, this);
    167     for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
    168       (*cq)->Shutdown();
    169     }
    170     for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
    171       thr->join();
    172     }
    173     for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
    174       bool ok;
    175       void* got_tag;
    176       while ((*cq)->Next(&got_tag, &ok))
    177         ;
    178     }
    179     shutdown_thread.join();
    180   }
    181 
    182   int GetPollCount() override {
    183     int count = 0;
    184     for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); cq++) {
    185       count += grpc_get_cq_poll_num((*cq)->cq());
    186     }
    187     return count;
    188   }
    189 
    190   std::shared_ptr<Channel> InProcessChannel(
    191       const ChannelArguments& args) override {
    192     return server_->InProcessChannel(args);
    193   }
    194 
    195  private:
    196   void ShutdownThreadFunc() {
    197     // TODO (vpai): Remove this deadline and allow Shutdown to finish properly
    198     auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(3);
    199     server_->Shutdown(deadline);
    200   }
    201 
    202   void ThreadFunc(int thread_idx) {
    203     // Wait until work is available or we are shutting down
    204     bool ok;
    205     void* got_tag;
    206     if (!srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
    207       return;
    208     }
    209     ServerRpcContext* ctx;
    210     std::mutex* mu_ptr = &shutdown_state_[thread_idx]->mutex;
    211     do {
    212       ctx = detag(got_tag);
    213       // The tag is a pointer to an RPC context to invoke
    214       // Proceed while holding a lock to make sure that
    215       // this thread isn't supposed to shut down
    216       mu_ptr->lock();
    217       if (shutdown_state_[thread_idx]->shutdown) {
    218         mu_ptr->unlock();
    219         return;
    220       }
    221     } while (srv_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
    222         [&, ctx, ok, mu_ptr]() {
    223           ctx->lock();
    224           if (!ctx->RunNextState(ok)) {
    225             ctx->Reset();
    226           }
    227           ctx->unlock();
    228           mu_ptr->unlock();
    229         },
    230         &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)));
    231   }
    232 
    233   class ServerRpcContext {
    234    public:
    235     ServerRpcContext() {}
    236     void lock() { mu_.lock(); }
    237     void unlock() { mu_.unlock(); }
    238     virtual ~ServerRpcContext(){};
    239     virtual bool RunNextState(bool) = 0;  // next state, return false if done
    240     virtual void Reset() = 0;             // start this back at a clean state
    241    private:
    242     std::mutex mu_;
    243   };
    244   static void* tag(ServerRpcContext* func) { return static_cast<void*>(func); }
    245   static ServerRpcContext* detag(void* tag) {
    246     return static_cast<ServerRpcContext*>(tag);
    247   }
    248 
    249   class ServerRpcContextUnaryImpl final : public ServerRpcContext {
    250    public:
    251     ServerRpcContextUnaryImpl(
    252         std::function<void(ServerContextType*, RequestType*,
    253                            grpc::ServerAsyncResponseWriter<ResponseType>*,
    254                            void*)>
    255             request_method,
    256         std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
    257         : srv_ctx_(new ServerContextType),
    258           next_state_(&ServerRpcContextUnaryImpl::invoker),
    259           request_method_(request_method),
    260           invoke_method_(invoke_method),
    261           response_writer_(srv_ctx_.get()) {
    262       request_method_(srv_ctx_.get(), &req_, &response_writer_,
    263                       AsyncQpsServerTest::tag(this));
    264     }
    265     ~ServerRpcContextUnaryImpl() override {}
    266     bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
    267     void Reset() override {
    268       srv_ctx_.reset(new ServerContextType);
    269       req_ = RequestType();
    270       response_writer_ =
    271           grpc::ServerAsyncResponseWriter<ResponseType>(srv_ctx_.get());
    272 
    273       // Then request the method
    274       next_state_ = &ServerRpcContextUnaryImpl::invoker;
    275       request_method_(srv_ctx_.get(), &req_, &response_writer_,
    276                       AsyncQpsServerTest::tag(this));
    277     }
    278 
    279    private:
    280     bool finisher(bool) { return false; }
    281     bool invoker(bool ok) {
    282       if (!ok) {
    283         return false;
    284       }
    285 
    286       // Call the RPC processing function
    287       grpc::Status status = invoke_method_(&req_, &response_);
    288 
    289       // Have the response writer work and invoke on_finish when done
    290       next_state_ = &ServerRpcContextUnaryImpl::finisher;
    291       response_writer_.Finish(response_, status, AsyncQpsServerTest::tag(this));
    292       return true;
    293     }
    294     std::unique_ptr<ServerContextType> srv_ctx_;
    295     RequestType req_;
    296     ResponseType response_;
    297     bool (ServerRpcContextUnaryImpl::*next_state_)(bool);
    298     std::function<void(ServerContextType*, RequestType*,
    299                        grpc::ServerAsyncResponseWriter<ResponseType>*, void*)>
    300         request_method_;
    301     std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
    302     grpc::ServerAsyncResponseWriter<ResponseType> response_writer_;
    303   };
    304 
    305   class ServerRpcContextStreamingImpl final : public ServerRpcContext {
    306    public:
    307     ServerRpcContextStreamingImpl(
    308         std::function<void(
    309             ServerContextType*,
    310             grpc::ServerAsyncReaderWriter<ResponseType, RequestType>*, void*)>
    311             request_method,
    312         std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
    313         : srv_ctx_(new ServerContextType),
    314           next_state_(&ServerRpcContextStreamingImpl::request_done),
    315           request_method_(request_method),
    316           invoke_method_(invoke_method),
    317           stream_(srv_ctx_.get()) {
    318       request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
    319     }
    320     ~ServerRpcContextStreamingImpl() override {}
    321     bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
    322     void Reset() override {
    323       srv_ctx_.reset(new ServerContextType);
    324       req_ = RequestType();
    325       stream_ = grpc::ServerAsyncReaderWriter<ResponseType, RequestType>(
    326           srv_ctx_.get());
    327 
    328       // Then request the method
    329       next_state_ = &ServerRpcContextStreamingImpl::request_done;
    330       request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
    331     }
    332 
    333    private:
    334     bool request_done(bool ok) {
    335       if (!ok) {
    336         return false;
    337       }
    338       next_state_ = &ServerRpcContextStreamingImpl::read_done;
    339       stream_.Read(&req_, AsyncQpsServerTest::tag(this));
    340       return true;
    341     }
    342 
    343     bool read_done(bool ok) {
    344       if (ok) {
    345         // invoke the method
    346         // Call the RPC processing function
    347         grpc::Status status = invoke_method_(&req_, &response_);
    348         // initiate the write
    349         next_state_ = &ServerRpcContextStreamingImpl::write_done;
    350         stream_.Write(response_, AsyncQpsServerTest::tag(this));
    351       } else {  // client has sent writes done
    352         // finish the stream
    353         next_state_ = &ServerRpcContextStreamingImpl::finish_done;
    354         stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
    355       }
    356       return true;
    357     }
    358     bool write_done(bool ok) {
    359       // now go back and get another streaming read!
    360       if (ok) {
    361         next_state_ = &ServerRpcContextStreamingImpl::read_done;
    362         stream_.Read(&req_, AsyncQpsServerTest::tag(this));
    363       } else {
    364         next_state_ = &ServerRpcContextStreamingImpl::finish_done;
    365         stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
    366       }
    367       return true;
    368     }
    369     bool finish_done(bool ok) { return false; /* reset the context */ }
    370 
    371     std::unique_ptr<ServerContextType> srv_ctx_;
    372     RequestType req_;
    373     ResponseType response_;
    374     bool (ServerRpcContextStreamingImpl::*next_state_)(bool);
    375     std::function<void(
    376         ServerContextType*,
    377         grpc::ServerAsyncReaderWriter<ResponseType, RequestType>*, void*)>
    378         request_method_;
    379     std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
    380     grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_;
    381   };
    382 
    383   class ServerRpcContextStreamingFromClientImpl final
    384       : public ServerRpcContext {
    385    public:
    386     ServerRpcContextStreamingFromClientImpl(
    387         std::function<void(ServerContextType*,
    388                            grpc::ServerAsyncReader<ResponseType, RequestType>*,
    389                            void*)>
    390             request_method,
    391         std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
    392         : srv_ctx_(new ServerContextType),
    393           next_state_(&ServerRpcContextStreamingFromClientImpl::request_done),
    394           request_method_(request_method),
    395           invoke_method_(invoke_method),
    396           stream_(srv_ctx_.get()) {
    397       request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
    398     }
    399     ~ServerRpcContextStreamingFromClientImpl() override {}
    400     bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
    401     void Reset() override {
    402       srv_ctx_.reset(new ServerContextType);
    403       req_ = RequestType();
    404       stream_ =
    405           grpc::ServerAsyncReader<ResponseType, RequestType>(srv_ctx_.get());
    406 
    407       // Then request the method
    408       next_state_ = &ServerRpcContextStreamingFromClientImpl::request_done;
    409       request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
    410     }
    411 
    412    private:
    413     bool request_done(bool ok) {
    414       if (!ok) {
    415         return false;
    416       }
    417       next_state_ = &ServerRpcContextStreamingFromClientImpl::read_done;
    418       stream_.Read(&req_, AsyncQpsServerTest::tag(this));
    419       return true;
    420     }
    421 
    422     bool read_done(bool ok) {
    423       if (ok) {
    424         // In this case, just do another read
    425         // next_state_ is unchanged
    426         stream_.Read(&req_, AsyncQpsServerTest::tag(this));
    427         return true;
    428       } else {  // client has sent writes done
    429         // invoke the method
    430         // Call the RPC processing function
    431         grpc::Status status = invoke_method_(&req_, &response_);
    432         // finish the stream
    433         next_state_ = &ServerRpcContextStreamingFromClientImpl::finish_done;
    434         stream_.Finish(response_, Status::OK, AsyncQpsServerTest::tag(this));
    435       }
    436       return true;
    437     }
    438     bool finish_done(bool ok) { return false; /* reset the context */ }
    439 
    440     std::unique_ptr<ServerContextType> srv_ctx_;
    441     RequestType req_;
    442     ResponseType response_;
    443     bool (ServerRpcContextStreamingFromClientImpl::*next_state_)(bool);
    444     std::function<void(ServerContextType*,
    445                        grpc::ServerAsyncReader<ResponseType, RequestType>*,
    446                        void*)>
    447         request_method_;
    448     std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
    449     grpc::ServerAsyncReader<ResponseType, RequestType> stream_;
    450   };
    451 
    452   class ServerRpcContextStreamingFromServerImpl final
    453       : public ServerRpcContext {
    454    public:
    455     ServerRpcContextStreamingFromServerImpl(
    456         std::function<void(ServerContextType*, RequestType*,
    457                            grpc::ServerAsyncWriter<ResponseType>*, void*)>
    458             request_method,
    459         std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
    460         : srv_ctx_(new ServerContextType),
    461           next_state_(&ServerRpcContextStreamingFromServerImpl::request_done),
    462           request_method_(request_method),
    463           invoke_method_(invoke_method),
    464           stream_(srv_ctx_.get()) {
    465       request_method_(srv_ctx_.get(), &req_, &stream_,
    466                       AsyncQpsServerTest::tag(this));
    467     }
    468     ~ServerRpcContextStreamingFromServerImpl() override {}
    469     bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
    470     void Reset() override {
    471       srv_ctx_.reset(new ServerContextType);
    472       req_ = RequestType();
    473       stream_ = grpc::ServerAsyncWriter<ResponseType>(srv_ctx_.get());
    474 
    475       // Then request the method
    476       next_state_ = &ServerRpcContextStreamingFromServerImpl::request_done;
    477       request_method_(srv_ctx_.get(), &req_, &stream_,
    478                       AsyncQpsServerTest::tag(this));
    479     }
    480 
    481    private:
    482     bool request_done(bool ok) {
    483       if (!ok) {
    484         return false;
    485       }
    486       // invoke the method
    487       // Call the RPC processing function
    488       grpc::Status status = invoke_method_(&req_, &response_);
    489 
    490       next_state_ = &ServerRpcContextStreamingFromServerImpl::write_done;
    491       stream_.Write(response_, AsyncQpsServerTest::tag(this));
    492       return true;
    493     }
    494 
    495     bool write_done(bool ok) {
    496       if (ok) {
    497         // Do another write!
    498         // next_state_ is unchanged
    499         stream_.Write(response_, AsyncQpsServerTest::tag(this));
    500       } else {  // must be done so let's finish
    501         next_state_ = &ServerRpcContextStreamingFromServerImpl::finish_done;
    502         stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
    503       }
    504       return true;
    505     }
    506     bool finish_done(bool ok) { return false; /* reset the context */ }
    507 
    508     std::unique_ptr<ServerContextType> srv_ctx_;
    509     RequestType req_;
    510     ResponseType response_;
    511     bool (ServerRpcContextStreamingFromServerImpl::*next_state_)(bool);
    512     std::function<void(ServerContextType*, RequestType*,
    513                        grpc::ServerAsyncWriter<ResponseType>*, void*)>
    514         request_method_;
    515     std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
    516     grpc::ServerAsyncWriter<ResponseType> stream_;
    517   };
    518 
    519   std::vector<std::thread> threads_;
    520   std::unique_ptr<grpc::Server> server_;
    521   std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_;
    522   std::vector<int> cq_;
    523   ServiceType async_service_;
    524   std::vector<std::unique_ptr<ServerRpcContext>> contexts_;
    525 
    526   struct PerThreadShutdownState {
    527     mutable std::mutex mutex;
    528     bool shutdown;
    529     PerThreadShutdownState() : shutdown(false) {}
    530   };
    531 
    532   std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
    533 };
    534 
    535 static void RegisterBenchmarkService(ServerBuilder* builder,
    536                                      BenchmarkService::AsyncService* service) {
    537   builder->RegisterService(service);
    538 }
    539 static void RegisterGenericService(ServerBuilder* builder,
    540                                    grpc::AsyncGenericService* service) {
    541   builder->RegisterAsyncGenericService(service);
    542 }
    543 
    544 static Status ProcessSimpleRPC(const PayloadConfig&, SimpleRequest* request,
    545                                SimpleResponse* response) {
    546   if (request->response_size() > 0) {
    547     if (!Server::SetPayload(request->response_type(), request->response_size(),
    548                             response->mutable_payload())) {
    549       return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
    550     }
    551   }
    552   // We are done using the request. Clear it to reduce working memory.
    553   // This proves to reduce cache misses in large message size cases.
    554   request->Clear();
    555   return Status::OK;
    556 }
    557 
    558 static Status ProcessGenericRPC(const PayloadConfig& payload_config,
    559                                 ByteBuffer* request, ByteBuffer* response) {
    560   // We are done using the request. Clear it to reduce working memory.
    561   // This proves to reduce cache misses in large message size cases.
    562   request->Clear();
    563   int resp_size = payload_config.bytebuf_params().resp_size();
    564   std::unique_ptr<char[]> buf(new char[resp_size]);
    565   Slice slice(buf.get(), resp_size);
    566   *response = ByteBuffer(&slice, 1);
    567   return Status::OK;
    568 }
    569 
    570 std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config) {
    571   return std::unique_ptr<Server>(
    572       new AsyncQpsServerTest<SimpleRequest, SimpleResponse,
    573                              BenchmarkService::AsyncService,
    574                              grpc::ServerContext>(
    575           config, RegisterBenchmarkService,
    576           &BenchmarkService::AsyncService::RequestUnaryCall,
    577           &BenchmarkService::AsyncService::RequestStreamingCall,
    578           &BenchmarkService::AsyncService::RequestStreamingFromClient,
    579           &BenchmarkService::AsyncService::RequestStreamingFromServer,
    580           &BenchmarkService::AsyncService::RequestStreamingBothWays,
    581           ProcessSimpleRPC));
    582 }
    583 std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig& config) {
    584   return std::unique_ptr<Server>(
    585       new AsyncQpsServerTest<ByteBuffer, ByteBuffer, grpc::AsyncGenericService,
    586                              grpc::GenericServerContext>(
    587           config, RegisterGenericService, nullptr,
    588           &grpc::AsyncGenericService::RequestCall, nullptr, nullptr, nullptr,
    589           ProcessGenericRPC));
    590 }
    591 
    592 }  // namespace testing
    593 }  // namespace grpc
    594