Home | History | Annotate | Download | only in libpdx_default_transport
      1 // Use ALWAYS at the tag level. Control is performed manually during command
      2 // line processing.
      3 #define ATRACE_TAG ATRACE_TAG_ALWAYS
      4 #include <utils/Trace.h>
      5 
      6 #include <base/files/file_util.h>
      7 #include <base/logging.h>
      8 #include <base/strings/string_split.h>
      9 #include <errno.h>
     10 #include <getopt.h>
     11 #include <pdx/client.h>
     12 #include <pdx/default_transport/client_channel_factory.h>
     13 #include <pdx/default_transport/service_endpoint.h>
     14 #include <pdx/rpc/buffer_wrapper.h>
     15 #include <pdx/rpc/default_initialization_allocator.h>
     16 #include <pdx/rpc/message_buffer.h>
     17 #include <pdx/rpc/remote_method.h>
     18 #include <pdx/rpc/serializable.h>
     19 #include <pdx/service.h>
     20 #include <sys/prctl.h>
     21 #include <time.h>
     22 #include <unistd.h>
     23 
     24 #include <atomic>
     25 #include <cstdlib>
     26 #include <functional>
     27 #include <future>
     28 #include <iomanip>
     29 #include <ios>
     30 #include <iostream>
     31 #include <memory>
     32 #include <numeric>
     33 #include <sstream>
     34 #include <string>
     35 #include <thread>
     36 #include <vector>
     37 
     38 using android::pdx::Channel;
     39 using android::pdx::ClientBase;
     40 using android::pdx::Endpoint;
     41 using android::pdx::ErrorStatus;
     42 using android::pdx::Message;
     43 using android::pdx::Service;
     44 using android::pdx::ServiceBase;
     45 using android::pdx::default_transport::ClientChannelFactory;
     46 using android::pdx::Status;
     47 using android::pdx::Transaction;
     48 using android::pdx::rpc::BufferWrapper;
     49 using android::pdx::rpc::DefaultInitializationAllocator;
     50 using android::pdx::rpc::MessageBuffer;
     51 using android::pdx::rpc::DispatchRemoteMethod;
     52 using android::pdx::rpc::RemoteMethodReturn;
     53 using android::pdx::rpc::ReplyBuffer;
     54 using android::pdx::rpc::Void;
     55 using android::pdx::rpc::WrapBuffer;
     56 
     57 namespace {
     58 
     59 constexpr size_t kMaxMessageSize = 4096 * 1024;
     60 
     61 std::string GetServicePath(const std::string& path, int instance_id) {
     62   return path + std::to_string(instance_id);
     63 }
     64 
     65 void SetThreadName(const std::string& name) {
     66   prctl(PR_SET_NAME, reinterpret_cast<unsigned long>(name.c_str()), 0, 0, 0);
     67 }
     68 
     69 constexpr uint64_t kNanosPerSecond = 1000000000llu;
     70 
     71 uint64_t GetClockNs() {
     72   timespec t;
     73   clock_gettime(CLOCK_MONOTONIC, &t);
     74   return kNanosPerSecond * t.tv_sec + t.tv_nsec;
     75 }
     76 
     77 template <typename T>
     78 ssize_t ssizeof(const T&) {
     79   return static_cast<ssize_t>(sizeof(T));
     80 }
     81 
     82 class SchedStats {
     83  public:
     84   SchedStats() : SchedStats(gettid()) {}
     85   SchedStats(pid_t task_id) : task_id_(task_id) {}
     86   SchedStats(const SchedStats&) = default;
     87   SchedStats& operator=(const SchedStats&) = default;
     88 
     89   void Update() {
     90     const std::string stats_path =
     91         "/proc/" + std::to_string(task_id_) + "/schedstat";
     92 
     93     std::string line;
     94     base::ReadFileToString(base::FilePath{stats_path}, &line);
     95     std::vector<std::string> stats = base::SplitString(
     96         line, " ", base::TRIM_WHITESPACE, base::SPLIT_WANT_ALL);
     97 
     98     CHECK_EQ(3u, stats.size());
     99 
    100     // Calculate the deltas since the last update. Each value is absolute since
    101     // the task started.
    102     uint64_t current_cpu_time_ns = std::stoull(stats[0]);
    103     uint64_t current_wait_ns = std::stoull(stats[1]);
    104     uint64_t current_timeslices = std::stoull(stats[2]);
    105     cpu_time_ns_ = current_cpu_time_ns - last_cpu_time_ns_;
    106     wait_ns_ = current_wait_ns - last_wait_ns_;
    107     timeslices_ = current_timeslices - last_timeslices_;
    108     last_cpu_time_ns_ = current_cpu_time_ns;
    109     last_wait_ns_ = current_wait_ns;
    110     last_timeslices_ = current_timeslices;
    111   }
    112 
    113   pid_t task_id() const { return task_id_; }
    114   uint64_t cpu_time_ns() const { return cpu_time_ns_; }
    115   uint64_t wait_ns() const { return wait_ns_; }
    116   uint64_t timeslices() const { return timeslices_; }
    117 
    118   double cpu_time_s() const {
    119     return static_cast<double>(cpu_time_ns_) / kNanosPerSecond;
    120   }
    121   double wait_s() const {
    122     return static_cast<double>(wait_ns_) / kNanosPerSecond;
    123   }
    124 
    125  private:
    126   int32_t task_id_;
    127   uint64_t cpu_time_ns_ = 0;
    128   uint64_t last_cpu_time_ns_ = 0;
    129   uint64_t wait_ns_ = 0;
    130   uint64_t last_wait_ns_ = 0;
    131   uint64_t timeslices_ = 0;
    132   uint64_t last_timeslices_ = 0;
    133 
    134   PDX_SERIALIZABLE_MEMBERS(SchedStats, task_id_, cpu_time_ns_, wait_ns_,
    135                            timeslices_);
    136 };
    137 
    138 // Opcodes for client/service protocol.
    139 struct BenchmarkOps {
    140   enum : int {
    141     Nop,
    142     Read,
    143     Write,
    144     Echo,
    145     Stats,
    146     WriteVector,
    147     EchoVector,
    148     Quit,
    149   };
    150 };
    151 
    152 struct BenchmarkRPC {
    153   PDX_REMOTE_METHOD(Stats, BenchmarkOps::Stats,
    154                     std::tuple<uint64_t, uint64_t, SchedStats>(Void));
    155   PDX_REMOTE_METHOD(WriteVector, BenchmarkOps::WriteVector,
    156                     int(const BufferWrapper<std::vector<uint8_t>> data));
    157   PDX_REMOTE_METHOD(EchoVector, BenchmarkOps::EchoVector,
    158                     BufferWrapper<std::vector<uint8_t>>(
    159                         const BufferWrapper<std::vector<uint8_t>> data));
    160 };
    161 
    162 struct BenchmarkResult {
    163   int thread_id = 0;
    164   int service_id = 0;
    165   double time_delta_s = 0.0;
    166   uint64_t bytes_sent = 0;
    167   SchedStats sched_stats = {};
    168 };
    169 
    170 // Global command line option values.
    171 struct Options {
    172   bool verbose = false;
    173   int threads = 1;
    174   int opcode = BenchmarkOps::Read;
    175   int blocksize = 1;
    176   int count = 1;
    177   int instances = 1;
    178   int timeout = 1;
    179   int warmup = 0;
    180 } ProgramOptions;
    181 
    182 // Command line option names.
    183 const char kOptionService[] = "service";
    184 const char kOptionClient[] = "client";
    185 const char kOptionVerbose[] = "verbose";
    186 const char kOptionOpcode[] = "op";
    187 const char kOptionBlocksize[] = "bs";
    188 const char kOptionCount[] = "count";
    189 const char kOptionThreads[] = "threads";
    190 const char kOptionInstances[] = "instances";
    191 const char kOptionTimeout[] = "timeout";
    192 const char kOptionTrace[] = "trace";
    193 const char kOptionWarmup[] = "warmup";
    194 
    195 // getopt() long options.
    196 static option long_options[] = {
    197     {kOptionService, required_argument, 0, 0},
    198     {kOptionClient, required_argument, 0, 0},
    199     {kOptionVerbose, no_argument, 0, 0},
    200     {kOptionOpcode, required_argument, 0, 0},
    201     {kOptionBlocksize, required_argument, 0, 0},
    202     {kOptionCount, required_argument, 0, 0},
    203     {kOptionThreads, required_argument, 0, 0},
    204     {kOptionInstances, required_argument, 0, 0},
    205     {kOptionTimeout, required_argument, 0, 0},
    206     {kOptionTrace, no_argument, 0, 0},
    207     {kOptionWarmup, required_argument, 0, 0},
    208     {0, 0, 0, 0},
    209 };
    210 
    211 // Parses the argument for kOptionOpcode and sets the value of
    212 // ProgramOptions.opcode.
    213 void ParseOpcodeOption(const std::string& argument) {
    214   if (argument == "read") {
    215     ProgramOptions.opcode = BenchmarkOps::Read;
    216   } else if (argument == "write") {
    217     ProgramOptions.opcode = BenchmarkOps::Write;
    218   } else if (argument == "echo") {
    219     ProgramOptions.opcode = BenchmarkOps::Echo;
    220   } else if (argument == "writevec") {
    221     ProgramOptions.opcode = BenchmarkOps::WriteVector;
    222   } else if (argument == "echovec") {
    223     ProgramOptions.opcode = BenchmarkOps::EchoVector;
    224   } else if (argument == "quit") {
    225     ProgramOptions.opcode = BenchmarkOps::Quit;
    226   } else if (argument == "nop") {
    227     ProgramOptions.opcode = BenchmarkOps::Nop;
    228   } else if (argument == "stats") {
    229     ProgramOptions.opcode = BenchmarkOps::Stats;
    230   } else {
    231     ProgramOptions.opcode = std::stoi(argument);
    232   }
    233 }
    234 
    235 // Implements the service side of the benchmark.
    236 class BenchmarkService : public ServiceBase<BenchmarkService> {
    237  public:
    238   std::shared_ptr<Channel> OnChannelOpen(Message& message) override {
    239     VLOG(1) << "BenchmarkService::OnChannelCreate: cid="
    240             << message.GetChannelId();
    241     return nullptr;
    242   }
    243 
    244   void OnChannelClose(Message& message,
    245                       const std::shared_ptr<Channel>& /*channel*/) override {
    246     VLOG(1) << "BenchmarkService::OnChannelClose: cid="
    247             << message.GetChannelId();
    248   }
    249 
    250   Status<void> HandleMessage(Message& message) override {
    251     ATRACE_NAME("BenchmarkService::HandleMessage");
    252 
    253     switch (message.GetOp()) {
    254       case BenchmarkOps::Nop:
    255         VLOG(1) << "BenchmarkService::HandleMessage: op=nop";
    256         {
    257           ATRACE_NAME("Reply");
    258           CHECK(message.Reply(0));
    259         }
    260         return {};
    261 
    262       case BenchmarkOps::Write: {
    263         VLOG(1) << "BenchmarkService::HandleMessage: op=write send_length="
    264                 << message.GetSendLength()
    265                 << " receive_length=" << message.GetReceiveLength();
    266 
    267         Status<void> status;
    268         if (message.GetSendLength())
    269           status = message.ReadAll(send_buffer.data(), message.GetSendLength());
    270 
    271         {
    272           ATRACE_NAME("Reply");
    273           if (!status)
    274             CHECK(message.ReplyError(status.error()));
    275           else
    276             CHECK(message.Reply(message.GetSendLength()));
    277         }
    278         return {};
    279       }
    280 
    281       case BenchmarkOps::Read: {
    282         VLOG(1) << "BenchmarkService::HandleMessage: op=read send_length="
    283                 << message.GetSendLength()
    284                 << " receive_length=" << message.GetReceiveLength();
    285 
    286         Status<void> status;
    287         if (message.GetReceiveLength()) {
    288           status = message.WriteAll(receive_buffer.data(),
    289                                     message.GetReceiveLength());
    290         }
    291 
    292         {
    293           ATRACE_NAME("Reply");
    294           if (!status)
    295             CHECK(message.ReplyError(status.error()));
    296           else
    297             CHECK(message.Reply(message.GetReceiveLength()));
    298         }
    299         return {};
    300       }
    301 
    302       case BenchmarkOps::Echo: {
    303         VLOG(1) << "BenchmarkService::HandleMessage: op=echo send_length="
    304                 << message.GetSendLength()
    305                 << " receive_length=" << message.GetReceiveLength();
    306 
    307         Status<void> status;
    308         if (message.GetSendLength())
    309           status = message.ReadAll(send_buffer.data(), message.GetSendLength());
    310 
    311         if (!status) {
    312           CHECK(message.ReplyError(status.error()));
    313           return {};
    314         }
    315 
    316         if (message.GetSendLength()) {
    317           status =
    318               message.WriteAll(send_buffer.data(), message.GetSendLength());
    319         }
    320 
    321         {
    322           ATRACE_NAME("Reply");
    323           if (!status)
    324             CHECK(message.ReplyError(status.error()));
    325           else
    326             CHECK(message.Reply(message.GetSendLength()));
    327         }
    328         return {};
    329       }
    330 
    331       case BenchmarkOps::Stats: {
    332         VLOG(1) << "BenchmarkService::HandleMessage: op=echo send_length="
    333                 << message.GetSendLength()
    334                 << " receive_length=" << message.GetReceiveLength();
    335 
    336         // Snapshot the stats when the message is received.
    337         const uint64_t receive_time_ns = GetClockNs();
    338         sched_stats_.Update();
    339 
    340         // Use the RPC system to return the results.
    341         RemoteMethodReturn<BenchmarkRPC::Stats>(
    342             message, BenchmarkRPC::Stats::Return{receive_time_ns, GetClockNs(),
    343                                                  sched_stats_});
    344         return {};
    345       }
    346 
    347       case BenchmarkOps::WriteVector:
    348         VLOG(1) << "BenchmarkService::HandleMessage: op=writevec send_length="
    349                 << message.GetSendLength()
    350                 << " receive_length=" << message.GetReceiveLength();
    351 
    352         DispatchRemoteMethod<BenchmarkRPC::WriteVector>(
    353             *this, &BenchmarkService::OnWriteVector, message, kMaxMessageSize);
    354         return {};
    355 
    356       case BenchmarkOps::EchoVector:
    357         VLOG(1) << "BenchmarkService::HandleMessage: op=echovec send_length="
    358                 << message.GetSendLength()
    359                 << " receive_length=" << message.GetReceiveLength();
    360 
    361         DispatchRemoteMethod<BenchmarkRPC::EchoVector>(
    362             *this, &BenchmarkService::OnEchoVector, message, kMaxMessageSize);
    363         return {};
    364 
    365       case BenchmarkOps::Quit:
    366         Cancel();
    367         return ErrorStatus{ESHUTDOWN};
    368 
    369       default:
    370         VLOG(1) << "BenchmarkService::HandleMessage: default case; op="
    371                 << message.GetOp();
    372         return Service::DefaultHandleMessage(message);
    373     }
    374   }
    375 
    376   // Updates the scheduler stats from procfs for this thread.
    377   void UpdateSchedStats() { sched_stats_.Update(); }
    378 
    379  private:
    380   friend BASE;
    381 
    382   BenchmarkService(std::unique_ptr<Endpoint> endpoint)
    383       : BASE("BenchmarkService", std::move(endpoint)),
    384         send_buffer(kMaxMessageSize),
    385         receive_buffer(kMaxMessageSize) {}
    386 
    387   std::vector<uint8_t> send_buffer;
    388   std::vector<uint8_t> receive_buffer;
    389 
    390   // Each service thread has its own scheduler stats object.
    391   static thread_local SchedStats sched_stats_;
    392 
    393   using BufferType = BufferWrapper<
    394       std::vector<uint8_t, DefaultInitializationAllocator<uint8_t>>>;
    395 
    396   int OnWriteVector(Message&, const BufferType& data) { return data.size(); }
    397   BufferType OnEchoVector(Message&, BufferType&& data) {
    398     return std::move(data);
    399   }
    400 
    401   BenchmarkService(const BenchmarkService&) = delete;
    402   void operator=(const BenchmarkService&) = delete;
    403 };
    404 
    405 thread_local SchedStats BenchmarkService::sched_stats_;
    406 
    407 // Implements the client side of the benchmark.
    408 class BenchmarkClient : public ClientBase<BenchmarkClient> {
    409  public:
    410   int Nop() {
    411     ATRACE_NAME("BenchmarkClient::Nop");
    412     VLOG(1) << "BenchmarkClient::Nop";
    413     Transaction transaction{*this};
    414     return ReturnStatusOrError(transaction.Send<int>(BenchmarkOps::Nop));
    415   }
    416 
    417   int Write(const void* buffer, size_t length) {
    418     ATRACE_NAME("BenchmarkClient::Write");
    419     VLOG(1) << "BenchmarkClient::Write: buffer=" << buffer
    420             << " length=" << length;
    421     Transaction transaction{*this};
    422     return ReturnStatusOrError(
    423         transaction.Send<int>(BenchmarkOps::Write, buffer, length, nullptr, 0));
    424     // return write(endpoint_fd(), buffer, length);
    425   }
    426 
    427   int Read(void* buffer, size_t length) {
    428     ATRACE_NAME("BenchmarkClient::Read");
    429     VLOG(1) << "BenchmarkClient::Read: buffer=" << buffer
    430             << " length=" << length;
    431     Transaction transaction{*this};
    432     return ReturnStatusOrError(
    433         transaction.Send<int>(BenchmarkOps::Read, nullptr, 0, buffer, length));
    434     // return read(endpoint_fd(), buffer, length);
    435   }
    436 
    437   int Echo(const void* send_buffer, size_t send_length, void* receive_buffer,
    438            size_t receive_length) {
    439     ATRACE_NAME("BenchmarkClient::Echo");
    440     VLOG(1) << "BenchmarkClient::Echo: send_buffer=" << send_buffer
    441             << " send_length=" << send_length
    442             << " receive_buffer=" << receive_buffer
    443             << " receive_length=" << receive_length;
    444     Transaction transaction{*this};
    445     return ReturnStatusOrError(
    446         transaction.Send<int>(BenchmarkOps::Echo, send_buffer, send_length,
    447                               receive_buffer, receive_length));
    448   }
    449 
    450   int Stats(std::tuple<uint64_t, uint64_t, SchedStats>* stats_out) {
    451     ATRACE_NAME("BenchmarkClient::Stats");
    452     VLOG(1) << "BenchmarkClient::Stats";
    453 
    454     auto status = InvokeRemoteMethodInPlace<BenchmarkRPC::Stats>(stats_out);
    455     return status ? 0 : -status.error();
    456   }
    457 
    458   int WriteVector(const BufferWrapper<std::vector<uint8_t>>& data) {
    459     ATRACE_NAME("BenchmarkClient::Stats");
    460     VLOG(1) << "BenchmarkClient::Stats";
    461 
    462     auto status = InvokeRemoteMethod<BenchmarkRPC::WriteVector>(data);
    463     return ReturnStatusOrError(status);
    464   }
    465 
    466   template <typename T>
    467   int WriteVector(const BufferWrapper<T>& data) {
    468     ATRACE_NAME("BenchmarkClient::WriteVector");
    469     VLOG(1) << "BenchmarkClient::WriteVector";
    470 
    471     auto status = InvokeRemoteMethod<BenchmarkRPC::WriteVector>(data);
    472     return ReturnStatusOrError(status);
    473   }
    474 
    475   template <typename T, typename U>
    476   int EchoVector(const BufferWrapper<T>& data, BufferWrapper<U>* data_out) {
    477     ATRACE_NAME("BenchmarkClient::EchoVector");
    478     VLOG(1) << "BenchmarkClient::EchoVector";
    479 
    480     MessageBuffer<ReplyBuffer>::Reserve(kMaxMessageSize - 1);
    481     auto status =
    482         InvokeRemoteMethodInPlace<BenchmarkRPC::EchoVector>(data_out, data);
    483     return status ? 0 : -status.error();
    484   }
    485 
    486   int Quit() {
    487     VLOG(1) << "BenchmarkClient::Quit";
    488     Transaction transaction{*this};
    489     return ReturnStatusOrError(transaction.Send<int>(BenchmarkOps::Echo));
    490   }
    491 
    492  private:
    493   friend BASE;
    494 
    495   BenchmarkClient(const std::string& service_path)
    496       : BASE(ClientChannelFactory::Create(service_path),
    497              ProgramOptions.timeout) {}
    498 
    499   BenchmarkClient(const BenchmarkClient&) = delete;
    500   void operator=(const BenchmarkClient&) = delete;
    501 };
    502 
    503 // Creates a benchmark service at |path| and dispatches messages.
    504 int ServiceCommand(const std::string& path) {
    505   if (path.empty())
    506     return -EINVAL;
    507 
    508   // Start the requested number of dispatch threads.
    509   std::vector<std::thread> dispatch_threads;
    510   int service_count = ProgramOptions.instances;
    511   int service_id_counter = 0;
    512   int thread_id_counter = 0;
    513   std::atomic<bool> done(false);
    514 
    515   while (service_count--) {
    516     std::cerr << "Starting service instance " << service_id_counter
    517               << std::endl;
    518     auto service = BenchmarkService::Create(
    519         android::pdx::default_transport::Endpoint::CreateAndBindSocket(
    520             GetServicePath(path, service_id_counter),
    521             android::pdx::default_transport::Endpoint::kBlocking));
    522     if (!service) {
    523       std::cerr << "Failed to create service instance!!" << std::endl;
    524       done = true;
    525       break;
    526     }
    527 
    528     int thread_count = ProgramOptions.threads;
    529     while (thread_count--) {
    530       std::cerr << "Starting dispatch thread " << thread_id_counter
    531                 << " service " << service_id_counter << std::endl;
    532 
    533       dispatch_threads.emplace_back(
    534           [&](const int thread_id, const int service_id,
    535               const std::shared_ptr<BenchmarkService>& local_service) {
    536             SetThreadName("service" + std::to_string(service_id));
    537 
    538             // Read the initial schedstats for this thread from procfs.
    539             local_service->UpdateSchedStats();
    540 
    541             ATRACE_NAME("BenchmarkService::Dispatch");
    542             while (!done) {
    543               auto ret = local_service->ReceiveAndDispatch();
    544               if (!ret) {
    545                 if (ret.error() != ESHUTDOWN) {
    546                   std::cerr << "Error while dispatching message on thread "
    547                             << thread_id << " service " << service_id << ": "
    548                             << ret.GetErrorMessage() << std::endl;
    549                 } else {
    550                   std::cerr << "Quitting thread " << thread_id << " service "
    551                             << service_id << std::endl;
    552                 }
    553                 done = true;
    554                 return;
    555               }
    556             }
    557           },
    558           thread_id_counter++, service_id_counter, service);
    559     }
    560 
    561     service_id_counter++;
    562   }
    563 
    564   // Wait for the dispatch threads to exit.
    565   for (auto& thread : dispatch_threads) {
    566     thread.join();
    567   }
    568 
    569   return 0;
    570 }
    571 
    572 int ClientCommand(const std::string& path) {
    573   // Start the requested number of client threads.
    574   std::vector<std::thread> client_threads;
    575   std::vector<std::future<BenchmarkResult>> client_results;
    576   int service_count = ProgramOptions.instances;
    577   int thread_id_counter = 0;
    578   int service_id_counter = 0;
    579 
    580   // Aggregate statistics, updated when worker threads exit.
    581   std::atomic<uint64_t> total_bytes(0);
    582   std::atomic<uint64_t> total_time_ns(0);
    583 
    584   // Samples for variance calculation.
    585   std::vector<uint64_t> latency_samples_ns(
    586       ProgramOptions.instances * ProgramOptions.threads * ProgramOptions.count);
    587   const size_t samples_per_thread = ProgramOptions.count;
    588 
    589   std::vector<uint8_t> send_buffer(ProgramOptions.blocksize);
    590   std::vector<uint8_t> receive_buffer(kMaxMessageSize);
    591 
    592   // Barriers for synchronizing thread start.
    593   std::vector<std::future<void>> ready_barrier_futures;
    594   std::promise<void> go_barrier_promise;
    595   std::future<void> go_barrier_future = go_barrier_promise.get_future();
    596 
    597   // Barrier for synchronizing thread tear down.
    598   std::promise<void> done_barrier_promise;
    599   std::future<void> done_barrier_future = done_barrier_promise.get_future();
    600 
    601   while (service_count--) {
    602     int thread_count = ProgramOptions.threads;
    603     while (thread_count--) {
    604       std::cerr << "Starting client thread " << thread_id_counter << " service "
    605                 << service_id_counter << std::endl;
    606 
    607       std::promise<BenchmarkResult> result_promise;
    608       client_results.push_back(result_promise.get_future());
    609 
    610       std::promise<void> ready_barrier_promise;
    611       ready_barrier_futures.push_back(ready_barrier_promise.get_future());
    612 
    613       client_threads.emplace_back(
    614           [&](const int thread_id, const int service_id,
    615               std::promise<BenchmarkResult> result, std::promise<void> ready) {
    616             SetThreadName("client" + std::to_string(thread_id) + "/" +
    617                           std::to_string(service_id));
    618 
    619             ATRACE_NAME("BenchmarkClient::Dispatch");
    620 
    621             auto client =
    622                 BenchmarkClient::Create(GetServicePath(path, service_id));
    623             if (!client) {
    624               std::cerr << "Failed to create client for service " << service_id
    625                         << std::endl;
    626               return -ENOMEM;
    627             }
    628 
    629             uint64_t* thread_samples =
    630                 &latency_samples_ns[samples_per_thread * thread_id];
    631 
    632             // Per-thread statistics.
    633             uint64_t bytes_sent = 0;
    634             uint64_t time_start_ns;
    635             uint64_t time_end_ns;
    636             SchedStats sched_stats;
    637 
    638             // Signal ready and wait for go.
    639             ready.set_value();
    640             go_barrier_future.wait();
    641 
    642             // Warmup the scheduler.
    643             int warmup = ProgramOptions.warmup;
    644             while (warmup--) {
    645               for (int i = 0; i < 1000000; i++)
    646                 ;
    647             }
    648 
    649             sched_stats.Update();
    650             time_start_ns = GetClockNs();
    651 
    652             int count = ProgramOptions.count;
    653             while (count--) {
    654               uint64_t iteration_start_ns = GetClockNs();
    655 
    656               switch (ProgramOptions.opcode) {
    657                 case BenchmarkOps::Nop: {
    658                   const int ret = client->Nop();
    659                   if (ret < 0) {
    660                     std::cerr << "Failed to send nop: " << strerror(-ret)
    661                               << std::endl;
    662                     return ret;
    663                   } else {
    664                     VLOG(1) << "Success";
    665                   }
    666                   break;
    667                 }
    668 
    669                 case BenchmarkOps::Read: {
    670                   const int ret = client->Read(receive_buffer.data(),
    671                                                ProgramOptions.blocksize);
    672                   if (ret < 0) {
    673                     std::cerr << "Failed to read: " << strerror(-ret)
    674                               << std::endl;
    675                     return ret;
    676                   } else if (ret != ProgramOptions.blocksize) {
    677                     std::cerr << "Expected ret=" << ProgramOptions.blocksize
    678                               << "; actual ret=" << ret << std::endl;
    679                     return -EINVAL;
    680                   } else {
    681                     VLOG(1) << "Success";
    682                     bytes_sent += ret;
    683                   }
    684                   break;
    685                 }
    686 
    687                 case BenchmarkOps::Write: {
    688                   const int ret =
    689                       client->Write(send_buffer.data(), send_buffer.size());
    690                   if (ret < 0) {
    691                     std::cerr << "Failed to write: " << strerror(-ret)
    692                               << std::endl;
    693                     return ret;
    694                   } else if (ret != ProgramOptions.blocksize) {
    695                     std::cerr << "Expected ret=" << ProgramOptions.blocksize
    696                               << "; actual ret=" << ret << std::endl;
    697                     return -EINVAL;
    698                   } else {
    699                     VLOG(1) << "Success";
    700                     bytes_sent += ret;
    701                   }
    702                   break;
    703                 }
    704 
    705                 case BenchmarkOps::Echo: {
    706                   const int ret = client->Echo(
    707                       send_buffer.data(), send_buffer.size(),
    708                       receive_buffer.data(), receive_buffer.size());
    709                   if (ret < 0) {
    710                     std::cerr << "Failed to echo: " << strerror(-ret)
    711                               << std::endl;
    712                     return ret;
    713                   } else if (ret != ProgramOptions.blocksize) {
    714                     std::cerr << "Expected ret=" << ProgramOptions.blocksize
    715                               << "; actual ret=" << ret << std::endl;
    716                     return -EINVAL;
    717                   } else {
    718                     VLOG(1) << "Success";
    719                     bytes_sent += ret * 2;
    720                   }
    721                   break;
    722                 }
    723 
    724                 case BenchmarkOps::Stats: {
    725                   std::tuple<uint64_t, uint64_t, SchedStats> stats;
    726                   const int ret = client->Stats(&stats);
    727                   if (ret < 0) {
    728                     std::cerr << "Failed to get stats: " << strerror(-ret)
    729                               << std::endl;
    730                     return ret;
    731                   } else {
    732                     VLOG(1) << "Success";
    733                     std::cerr
    734                         << "Round trip: receive_time_ns=" << std::get<0>(stats)
    735                         << " reply_time_ns=" << std::get<1>(stats)
    736                         << " cpu_time_s=" << std::get<2>(stats).cpu_time_s()
    737                         << " wait_s=" << std::get<2>(stats).wait_s()
    738                         << std::endl;
    739                   }
    740                   break;
    741                 }
    742 
    743                 case BenchmarkOps::WriteVector: {
    744                   const int ret = client->WriteVector(
    745                       WrapBuffer(send_buffer.data(), ProgramOptions.blocksize));
    746                   if (ret < 0) {
    747                     std::cerr << "Failed to write vector: " << strerror(-ret)
    748                               << std::endl;
    749                     return ret;
    750                   } else {
    751                     VLOG(1) << "Success";
    752                     bytes_sent += ret;
    753                   }
    754                   break;
    755                 }
    756 
    757                 case BenchmarkOps::EchoVector: {
    758                   thread_local BufferWrapper<std::vector<
    759                       uint8_t, DefaultInitializationAllocator<uint8_t>>>
    760                       response_buffer;
    761                   const int ret = client->EchoVector(
    762                       WrapBuffer(send_buffer.data(), ProgramOptions.blocksize),
    763                       &response_buffer);
    764                   if (ret < 0) {
    765                     std::cerr << "Failed to echo vector: " << strerror(-ret)
    766                               << std::endl;
    767                     return ret;
    768                   } else {
    769                     VLOG(1) << "Success";
    770                     bytes_sent += send_buffer.size() + response_buffer.size();
    771                   }
    772                   break;
    773                 }
    774 
    775                 case BenchmarkOps::Quit: {
    776                   const int ret = client->Quit();
    777                   if (ret < 0 && ret != -ESHUTDOWN) {
    778                     std::cerr << "Failed to send quit: " << strerror(-ret);
    779                     return ret;
    780                   } else {
    781                     VLOG(1) << "Success";
    782                   }
    783                   break;
    784                 }
    785 
    786                 default:
    787                   std::cerr
    788                       << "Invalid client operation: " << ProgramOptions.opcode
    789                       << std::endl;
    790                   return -EINVAL;
    791               }
    792 
    793               uint64_t iteration_end_ns = GetClockNs();
    794               uint64_t iteration_delta_ns =
    795                   iteration_end_ns - iteration_start_ns;
    796               thread_samples[count] = iteration_delta_ns;
    797 
    798               if (iteration_delta_ns > (kNanosPerSecond / 100)) {
    799                 SchedStats stats = sched_stats;
    800                 stats.Update();
    801                 std::cerr << "Thread " << thread_id << " iteration_delta_s="
    802                           << (static_cast<double>(iteration_delta_ns) /
    803                               kNanosPerSecond)
    804                           << " " << stats.cpu_time_s() << " " << stats.wait_s()
    805                           << std::endl;
    806               }
    807             }
    808 
    809             time_end_ns = GetClockNs();
    810             sched_stats.Update();
    811 
    812             const double time_delta_s =
    813                 static_cast<double>(time_end_ns - time_start_ns) /
    814                 kNanosPerSecond;
    815 
    816             total_bytes += bytes_sent;
    817             total_time_ns += time_end_ns - time_start_ns;
    818 
    819             result.set_value(
    820                 {thread_id, service_id, time_delta_s, bytes_sent, sched_stats});
    821             done_barrier_future.wait();
    822 
    823             return 0;
    824           },
    825           thread_id_counter++, service_id_counter, std::move(result_promise),
    826           std::move(ready_barrier_promise));
    827     }
    828 
    829     service_id_counter++;
    830   }
    831 
    832   // Wait for workers to be ready.
    833   std::cerr << "Waiting for workers to be ready..." << std::endl;
    834   for (auto& ready : ready_barrier_futures)
    835     ready.wait();
    836 
    837   // Signal workers to go.
    838   std::cerr << "Kicking off benchmark." << std::endl;
    839   go_barrier_promise.set_value();
    840 
    841   // Wait for all the worker threas to finish.
    842   for (auto& result : client_results)
    843     result.wait();
    844 
    845   // Report worker thread results.
    846   for (auto& result : client_results) {
    847     BenchmarkResult benchmark_result = result.get();
    848     std::cerr << std::fixed << "Thread " << benchmark_result.thread_id
    849               << " service " << benchmark_result.service_id << ":" << std::endl;
    850     std::cerr << "\t " << benchmark_result.bytes_sent << " bytes in "
    851               << benchmark_result.time_delta_s << " seconds ("
    852               << std::setprecision(0) << (benchmark_result.bytes_sent / 1024.0 /
    853                                           benchmark_result.time_delta_s)
    854               << " K/s; " << std::setprecision(3)
    855               << (ProgramOptions.count / benchmark_result.time_delta_s)
    856               << " txn/s; " << std::setprecision(9)
    857               << (benchmark_result.time_delta_s / ProgramOptions.count)
    858               << " s/txn)" << std::endl;
    859     std::cerr << "\tStats: " << benchmark_result.sched_stats.cpu_time_s() << " "
    860               << (benchmark_result.sched_stats.cpu_time_s() /
    861                   ProgramOptions.count)
    862               << " " << benchmark_result.sched_stats.wait_s() << " "
    863               << (benchmark_result.sched_stats.wait_s() / ProgramOptions.count)
    864               << " " << benchmark_result.sched_stats.timeslices() << std::endl;
    865   }
    866 
    867   // Signal worker threads to exit.
    868   done_barrier_promise.set_value();
    869 
    870   // Wait for the worker threads to exit.
    871   for (auto& thread : client_threads) {
    872     thread.join();
    873   }
    874 
    875   // Report aggregate results.
    876   const int total_threads = ProgramOptions.threads * ProgramOptions.instances;
    877   const int iterations = ProgramOptions.count;
    878   const double total_time_s =
    879       static_cast<double>(total_time_ns) / kNanosPerSecond;
    880   // This is about how much wall time it took to completely transfer all the
    881   // paylaods.
    882   const double average_time_s = total_time_s / total_threads;
    883 
    884   const uint64_t min_sample_time_ns =
    885       *std::min_element(latency_samples_ns.begin(), latency_samples_ns.end());
    886   const double min_sample_time_s =
    887       static_cast<double>(min_sample_time_ns) / kNanosPerSecond;
    888 
    889   const uint64_t max_sample_time_ns =
    890       *std::max_element(latency_samples_ns.begin(), latency_samples_ns.end());
    891   const double max_sample_time_s =
    892       static_cast<double>(max_sample_time_ns) / kNanosPerSecond;
    893 
    894   const double total_sample_time_s =
    895       std::accumulate(latency_samples_ns.begin(), latency_samples_ns.end(), 0.0,
    896                       [](double s, uint64_t ns) {
    897                         return s + static_cast<double>(ns) / kNanosPerSecond;
    898                       });
    899   const double average_sample_time_s =
    900       total_sample_time_s / latency_samples_ns.size();
    901 
    902   const double sum_of_squared_deviations = std::accumulate(
    903       latency_samples_ns.begin(), latency_samples_ns.end(), 0.0,
    904       [&](double s, uint64_t ns) {
    905         const double delta =
    906             static_cast<double>(ns) / kNanosPerSecond - average_sample_time_s;
    907         return s + delta * delta;
    908       });
    909   const double variance = sum_of_squared_deviations / latency_samples_ns.size();
    910   const double standard_deviation = std::sqrt(variance);
    911 
    912   const int num_buckets = 200;
    913   const uint64_t sample_range_ns = max_sample_time_ns - min_sample_time_ns;
    914   const uint64_t ns_per_bucket = sample_range_ns / num_buckets;
    915   std::array<uint64_t, num_buckets> sample_buckets = {{0}};
    916 
    917   // Count samples in each bucket range.
    918   for (uint64_t sample_ns : latency_samples_ns) {
    919     sample_buckets[(sample_ns - min_sample_time_ns) / (ns_per_bucket + 1)] += 1;
    920   }
    921 
    922   // Calculate population percentiles.
    923   const uint64_t percent_50 =
    924       static_cast<uint64_t>(latency_samples_ns.size() * 0.5);
    925   const uint64_t percent_90 =
    926       static_cast<uint64_t>(latency_samples_ns.size() * 0.9);
    927   const uint64_t percent_95 =
    928       static_cast<uint64_t>(latency_samples_ns.size() * 0.95);
    929   const uint64_t percent_99 =
    930       static_cast<uint64_t>(latency_samples_ns.size() * 0.99);
    931 
    932   uint64_t sample_count = 0;
    933   double latency_50th_percentile_s, latency_90th_percentile_s,
    934       latency_95th_percentile_s, latency_99th_percentile_s;
    935   for (int i = 0; i < num_buckets; i++) {
    936     // Report the midpoint of the bucket range as the value of the
    937     // corresponding
    938     // percentile.
    939     const double bucket_midpoint_time_s =
    940         (ns_per_bucket * i + 0.5 * ns_per_bucket + min_sample_time_ns) /
    941         kNanosPerSecond;
    942     if (sample_count < percent_50 &&
    943         (sample_count + sample_buckets[i]) >= percent_50) {
    944       latency_50th_percentile_s = bucket_midpoint_time_s;
    945     }
    946     if (sample_count < percent_90 &&
    947         (sample_count + sample_buckets[i]) >= percent_90) {
    948       latency_90th_percentile_s = bucket_midpoint_time_s;
    949     }
    950     if (sample_count < percent_95 &&
    951         (sample_count + sample_buckets[i]) >= percent_95) {
    952       latency_95th_percentile_s = bucket_midpoint_time_s;
    953     }
    954     if (sample_count < percent_99 &&
    955         (sample_count + sample_buckets[i]) >= percent_99) {
    956       latency_99th_percentile_s = bucket_midpoint_time_s;
    957     }
    958     sample_count += sample_buckets[i];
    959   }
    960 
    961   std::cerr << std::fixed << "Total throughput over " << total_threads
    962             << " threads:\n\t " << total_bytes << " bytes in " << average_time_s
    963             << " seconds (" << std::setprecision(0)
    964             << (total_bytes / 1024.0 / average_time_s) << " K/s; "
    965             << std::setprecision(3)
    966             << (iterations * total_threads / average_time_s)
    967             << std::setprecision(9) << " txn/s; "
    968             << (average_time_s / (iterations * total_threads)) << " s/txn)"
    969             << std::endl;
    970   std::cerr << "Sample statistics: " << std::endl;
    971   std::cerr << total_sample_time_s << " s total sample time" << std::endl;
    972   std::cerr << average_sample_time_s << " s avg" << std::endl;
    973   std::cerr << standard_deviation << " s std dev" << std::endl;
    974   std::cerr << min_sample_time_s << " s min" << std::endl;
    975   std::cerr << max_sample_time_s << " s max" << std::endl;
    976   std::cerr << "Latency percentiles:" << std::endl;
    977   std::cerr << "50th: " << latency_50th_percentile_s << " s" << std::endl;
    978   std::cerr << "90th: " << latency_90th_percentile_s << " s" << std::endl;
    979   std::cerr << "95th: " << latency_95th_percentile_s << " s" << std::endl;
    980   std::cerr << "99th: " << latency_99th_percentile_s << " s" << std::endl;
    981 
    982   std::cout << total_time_ns << " " << std::fixed << std::setprecision(9)
    983             << average_sample_time_s << " " << std::fixed
    984             << std::setprecision(9) << standard_deviation << std::endl;
    985   return 0;
    986 }
    987 
    988 int Usage(const std::string& command_name) {
    989   // clang-format off
    990   std::cout << "Usage: " << command_name << " [options]" << std::endl;
    991   std::cout << "\t--verbose                   : Use verbose messages." << std::endl;
    992   std::cout << "\t--service <endpoint path>   : Start service at the given path." << std::endl;
    993   std::cout << "\t--client <endpoint path>    : Start client to the given path." << std::endl;
    994   std::cout << "\t--op <read | write | echo>  : Sepcify client operation mode." << std::endl;
    995   std::cout << "\t--bs <block size bytes>     : Sepcify block size to use." << std::endl;
    996   std::cout << "\t--count <count>             : Sepcify number of transactions to make." << std::endl;
    997   std::cout << "\t--instances <count>         : Specify number of service instances." << std::endl;
    998   std::cout << "\t--threads <count>           : Sepcify number of threads per instance." << std::endl;
    999   std::cout << "\t--timeout <timeout ms | -1> : Timeout to wait for services." << std::endl;
   1000   std::cout << "\t--trace                     : Enable systrace logging." << std::endl;
   1001   std::cout << "\t--warmup <iterations>       : Busy loops before running benchmarks." << std::endl;
   1002   // clang-format on
   1003   return -1;
   1004 }
   1005 
   1006 }  // anonymous namespace
   1007 
   1008 int main(int argc, char** argv) {
   1009   logging::LoggingSettings logging_settings;
   1010   logging_settings.logging_dest = logging::LOG_TO_SYSTEM_DEBUG_LOG;
   1011   logging::InitLogging(logging_settings);
   1012 
   1013   int getopt_code;
   1014   int option_index;
   1015   std::string option = "";
   1016   std::string command = "";
   1017   std::string command_argument = "";
   1018   bool tracing_enabled = false;
   1019 
   1020   // Process command line options.
   1021   while ((getopt_code =
   1022               getopt_long(argc, argv, "", long_options, &option_index)) != -1) {
   1023     option = long_options[option_index].name;
   1024     VLOG(1) << "option=" << option;
   1025     switch (getopt_code) {
   1026       case 0:
   1027         if (option == kOptionVerbose) {
   1028           ProgramOptions.verbose = true;
   1029           logging::SetMinLogLevel(-1);
   1030         } else if (option == kOptionOpcode) {
   1031           ParseOpcodeOption(optarg);
   1032         } else if (option == kOptionBlocksize) {
   1033           ProgramOptions.blocksize = std::stoi(optarg);
   1034           if (ProgramOptions.blocksize < 0) {
   1035             std::cerr << "Invalid blocksize argument: "
   1036                       << ProgramOptions.blocksize << std::endl;
   1037             return -EINVAL;
   1038           }
   1039         } else if (option == kOptionCount) {
   1040           ProgramOptions.count = std::stoi(optarg);
   1041           if (ProgramOptions.count < 1) {
   1042             std::cerr << "Invalid count argument: " << ProgramOptions.count
   1043                       << std::endl;
   1044             return -EINVAL;
   1045           }
   1046         } else if (option == kOptionThreads) {
   1047           ProgramOptions.threads = std::stoi(optarg);
   1048           if (ProgramOptions.threads < 1) {
   1049             std::cerr << "Invalid threads argument: " << ProgramOptions.threads
   1050                       << std::endl;
   1051             return -EINVAL;
   1052           }
   1053         } else if (option == kOptionInstances) {
   1054           ProgramOptions.instances = std::stoi(optarg);
   1055           if (ProgramOptions.instances < 1) {
   1056             std::cerr << "Invalid instances argument: "
   1057                       << ProgramOptions.instances << std::endl;
   1058             return -EINVAL;
   1059           }
   1060         } else if (option == kOptionTimeout) {
   1061           ProgramOptions.timeout = std::stoi(optarg);
   1062         } else if (option == kOptionTrace) {
   1063           tracing_enabled = true;
   1064         } else if (option == kOptionWarmup) {
   1065           ProgramOptions.warmup = std::stoi(optarg);
   1066         } else {
   1067           command = option;
   1068           if (optarg)
   1069             command_argument = optarg;
   1070         }
   1071         break;
   1072     }
   1073   }
   1074 
   1075   // Setup ATRACE/systrace based on command line.
   1076   atrace_setup();
   1077   atrace_set_tracing_enabled(tracing_enabled);
   1078 
   1079   VLOG(1) << "command=" << command << " command_argument=" << command_argument;
   1080 
   1081   if (command == "") {
   1082     return Usage(argv[0]);
   1083   } else if (command == kOptionService) {
   1084     return ServiceCommand(command_argument);
   1085   } else if (command == kOptionClient) {
   1086     return ClientCommand(command_argument);
   1087   } else {
   1088     return Usage(argv[0]);
   1089   }
   1090 }
   1091