1 // Use ALWAYS at the tag level. Control is performed manually during command 2 // line processing. 3 #define ATRACE_TAG ATRACE_TAG_ALWAYS 4 #include <utils/Trace.h> 5 6 #include <base/files/file_util.h> 7 #include <base/logging.h> 8 #include <base/strings/string_split.h> 9 #include <errno.h> 10 #include <getopt.h> 11 #include <pdx/client.h> 12 #include <pdx/default_transport/client_channel_factory.h> 13 #include <pdx/default_transport/service_endpoint.h> 14 #include <pdx/rpc/buffer_wrapper.h> 15 #include <pdx/rpc/default_initialization_allocator.h> 16 #include <pdx/rpc/message_buffer.h> 17 #include <pdx/rpc/remote_method.h> 18 #include <pdx/rpc/serializable.h> 19 #include <pdx/service.h> 20 #include <sys/prctl.h> 21 #include <time.h> 22 #include <unistd.h> 23 24 #include <atomic> 25 #include <cstdlib> 26 #include <functional> 27 #include <future> 28 #include <iomanip> 29 #include <ios> 30 #include <iostream> 31 #include <memory> 32 #include <numeric> 33 #include <sstream> 34 #include <string> 35 #include <thread> 36 #include <vector> 37 38 using android::pdx::Channel; 39 using android::pdx::ClientBase; 40 using android::pdx::Endpoint; 41 using android::pdx::ErrorStatus; 42 using android::pdx::Message; 43 using android::pdx::Service; 44 using android::pdx::ServiceBase; 45 using android::pdx::default_transport::ClientChannelFactory; 46 using android::pdx::Status; 47 using android::pdx::Transaction; 48 using android::pdx::rpc::BufferWrapper; 49 using android::pdx::rpc::DefaultInitializationAllocator; 50 using android::pdx::rpc::MessageBuffer; 51 using android::pdx::rpc::DispatchRemoteMethod; 52 using android::pdx::rpc::RemoteMethodReturn; 53 using android::pdx::rpc::ReplyBuffer; 54 using android::pdx::rpc::Void; 55 using android::pdx::rpc::WrapBuffer; 56 57 namespace { 58 59 constexpr size_t kMaxMessageSize = 4096 * 1024; 60 61 std::string GetServicePath(const std::string& path, int instance_id) { 62 return path + std::to_string(instance_id); 63 } 64 65 void SetThreadName(const std::string& name) { 66 prctl(PR_SET_NAME, reinterpret_cast<unsigned long>(name.c_str()), 0, 0, 0); 67 } 68 69 constexpr uint64_t kNanosPerSecond = 1000000000llu; 70 71 uint64_t GetClockNs() { 72 timespec t; 73 clock_gettime(CLOCK_MONOTONIC, &t); 74 return kNanosPerSecond * t.tv_sec + t.tv_nsec; 75 } 76 77 template <typename T> 78 ssize_t ssizeof(const T&) { 79 return static_cast<ssize_t>(sizeof(T)); 80 } 81 82 class SchedStats { 83 public: 84 SchedStats() : SchedStats(gettid()) {} 85 SchedStats(pid_t task_id) : task_id_(task_id) {} 86 SchedStats(const SchedStats&) = default; 87 SchedStats& operator=(const SchedStats&) = default; 88 89 void Update() { 90 const std::string stats_path = 91 "/proc/" + std::to_string(task_id_) + "/schedstat"; 92 93 std::string line; 94 base::ReadFileToString(base::FilePath{stats_path}, &line); 95 std::vector<std::string> stats = base::SplitString( 96 line, " ", base::TRIM_WHITESPACE, base::SPLIT_WANT_ALL); 97 98 CHECK_EQ(3u, stats.size()); 99 100 // Calculate the deltas since the last update. Each value is absolute since 101 // the task started. 102 uint64_t current_cpu_time_ns = std::stoull(stats[0]); 103 uint64_t current_wait_ns = std::stoull(stats[1]); 104 uint64_t current_timeslices = std::stoull(stats[2]); 105 cpu_time_ns_ = current_cpu_time_ns - last_cpu_time_ns_; 106 wait_ns_ = current_wait_ns - last_wait_ns_; 107 timeslices_ = current_timeslices - last_timeslices_; 108 last_cpu_time_ns_ = current_cpu_time_ns; 109 last_wait_ns_ = current_wait_ns; 110 last_timeslices_ = current_timeslices; 111 } 112 113 pid_t task_id() const { return task_id_; } 114 uint64_t cpu_time_ns() const { return cpu_time_ns_; } 115 uint64_t wait_ns() const { return wait_ns_; } 116 uint64_t timeslices() const { return timeslices_; } 117 118 double cpu_time_s() const { 119 return static_cast<double>(cpu_time_ns_) / kNanosPerSecond; 120 } 121 double wait_s() const { 122 return static_cast<double>(wait_ns_) / kNanosPerSecond; 123 } 124 125 private: 126 int32_t task_id_; 127 uint64_t cpu_time_ns_ = 0; 128 uint64_t last_cpu_time_ns_ = 0; 129 uint64_t wait_ns_ = 0; 130 uint64_t last_wait_ns_ = 0; 131 uint64_t timeslices_ = 0; 132 uint64_t last_timeslices_ = 0; 133 134 PDX_SERIALIZABLE_MEMBERS(SchedStats, task_id_, cpu_time_ns_, wait_ns_, 135 timeslices_); 136 }; 137 138 // Opcodes for client/service protocol. 139 struct BenchmarkOps { 140 enum : int { 141 Nop, 142 Read, 143 Write, 144 Echo, 145 Stats, 146 WriteVector, 147 EchoVector, 148 Quit, 149 }; 150 }; 151 152 struct BenchmarkRPC { 153 PDX_REMOTE_METHOD(Stats, BenchmarkOps::Stats, 154 std::tuple<uint64_t, uint64_t, SchedStats>(Void)); 155 PDX_REMOTE_METHOD(WriteVector, BenchmarkOps::WriteVector, 156 int(const BufferWrapper<std::vector<uint8_t>> data)); 157 PDX_REMOTE_METHOD(EchoVector, BenchmarkOps::EchoVector, 158 BufferWrapper<std::vector<uint8_t>>( 159 const BufferWrapper<std::vector<uint8_t>> data)); 160 }; 161 162 struct BenchmarkResult { 163 int thread_id = 0; 164 int service_id = 0; 165 double time_delta_s = 0.0; 166 uint64_t bytes_sent = 0; 167 SchedStats sched_stats = {}; 168 }; 169 170 // Global command line option values. 171 struct Options { 172 bool verbose = false; 173 int threads = 1; 174 int opcode = BenchmarkOps::Read; 175 int blocksize = 1; 176 int count = 1; 177 int instances = 1; 178 int timeout = 1; 179 int warmup = 0; 180 } ProgramOptions; 181 182 // Command line option names. 183 const char kOptionService[] = "service"; 184 const char kOptionClient[] = "client"; 185 const char kOptionVerbose[] = "verbose"; 186 const char kOptionOpcode[] = "op"; 187 const char kOptionBlocksize[] = "bs"; 188 const char kOptionCount[] = "count"; 189 const char kOptionThreads[] = "threads"; 190 const char kOptionInstances[] = "instances"; 191 const char kOptionTimeout[] = "timeout"; 192 const char kOptionTrace[] = "trace"; 193 const char kOptionWarmup[] = "warmup"; 194 195 // getopt() long options. 196 static option long_options[] = { 197 {kOptionService, required_argument, 0, 0}, 198 {kOptionClient, required_argument, 0, 0}, 199 {kOptionVerbose, no_argument, 0, 0}, 200 {kOptionOpcode, required_argument, 0, 0}, 201 {kOptionBlocksize, required_argument, 0, 0}, 202 {kOptionCount, required_argument, 0, 0}, 203 {kOptionThreads, required_argument, 0, 0}, 204 {kOptionInstances, required_argument, 0, 0}, 205 {kOptionTimeout, required_argument, 0, 0}, 206 {kOptionTrace, no_argument, 0, 0}, 207 {kOptionWarmup, required_argument, 0, 0}, 208 {0, 0, 0, 0}, 209 }; 210 211 // Parses the argument for kOptionOpcode and sets the value of 212 // ProgramOptions.opcode. 213 void ParseOpcodeOption(const std::string& argument) { 214 if (argument == "read") { 215 ProgramOptions.opcode = BenchmarkOps::Read; 216 } else if (argument == "write") { 217 ProgramOptions.opcode = BenchmarkOps::Write; 218 } else if (argument == "echo") { 219 ProgramOptions.opcode = BenchmarkOps::Echo; 220 } else if (argument == "writevec") { 221 ProgramOptions.opcode = BenchmarkOps::WriteVector; 222 } else if (argument == "echovec") { 223 ProgramOptions.opcode = BenchmarkOps::EchoVector; 224 } else if (argument == "quit") { 225 ProgramOptions.opcode = BenchmarkOps::Quit; 226 } else if (argument == "nop") { 227 ProgramOptions.opcode = BenchmarkOps::Nop; 228 } else if (argument == "stats") { 229 ProgramOptions.opcode = BenchmarkOps::Stats; 230 } else { 231 ProgramOptions.opcode = std::stoi(argument); 232 } 233 } 234 235 // Implements the service side of the benchmark. 236 class BenchmarkService : public ServiceBase<BenchmarkService> { 237 public: 238 std::shared_ptr<Channel> OnChannelOpen(Message& message) override { 239 VLOG(1) << "BenchmarkService::OnChannelCreate: cid=" 240 << message.GetChannelId(); 241 return nullptr; 242 } 243 244 void OnChannelClose(Message& message, 245 const std::shared_ptr<Channel>& /*channel*/) override { 246 VLOG(1) << "BenchmarkService::OnChannelClose: cid=" 247 << message.GetChannelId(); 248 } 249 250 Status<void> HandleMessage(Message& message) override { 251 ATRACE_NAME("BenchmarkService::HandleMessage"); 252 253 switch (message.GetOp()) { 254 case BenchmarkOps::Nop: 255 VLOG(1) << "BenchmarkService::HandleMessage: op=nop"; 256 { 257 ATRACE_NAME("Reply"); 258 CHECK(message.Reply(0)); 259 } 260 return {}; 261 262 case BenchmarkOps::Write: { 263 VLOG(1) << "BenchmarkService::HandleMessage: op=write send_length=" 264 << message.GetSendLength() 265 << " receive_length=" << message.GetReceiveLength(); 266 267 Status<void> status; 268 if (message.GetSendLength()) 269 status = message.ReadAll(send_buffer.data(), message.GetSendLength()); 270 271 { 272 ATRACE_NAME("Reply"); 273 if (!status) 274 CHECK(message.ReplyError(status.error())); 275 else 276 CHECK(message.Reply(message.GetSendLength())); 277 } 278 return {}; 279 } 280 281 case BenchmarkOps::Read: { 282 VLOG(1) << "BenchmarkService::HandleMessage: op=read send_length=" 283 << message.GetSendLength() 284 << " receive_length=" << message.GetReceiveLength(); 285 286 Status<void> status; 287 if (message.GetReceiveLength()) { 288 status = message.WriteAll(receive_buffer.data(), 289 message.GetReceiveLength()); 290 } 291 292 { 293 ATRACE_NAME("Reply"); 294 if (!status) 295 CHECK(message.ReplyError(status.error())); 296 else 297 CHECK(message.Reply(message.GetReceiveLength())); 298 } 299 return {}; 300 } 301 302 case BenchmarkOps::Echo: { 303 VLOG(1) << "BenchmarkService::HandleMessage: op=echo send_length=" 304 << message.GetSendLength() 305 << " receive_length=" << message.GetReceiveLength(); 306 307 Status<void> status; 308 if (message.GetSendLength()) 309 status = message.ReadAll(send_buffer.data(), message.GetSendLength()); 310 311 if (!status) { 312 CHECK(message.ReplyError(status.error())); 313 return {}; 314 } 315 316 if (message.GetSendLength()) { 317 status = 318 message.WriteAll(send_buffer.data(), message.GetSendLength()); 319 } 320 321 { 322 ATRACE_NAME("Reply"); 323 if (!status) 324 CHECK(message.ReplyError(status.error())); 325 else 326 CHECK(message.Reply(message.GetSendLength())); 327 } 328 return {}; 329 } 330 331 case BenchmarkOps::Stats: { 332 VLOG(1) << "BenchmarkService::HandleMessage: op=echo send_length=" 333 << message.GetSendLength() 334 << " receive_length=" << message.GetReceiveLength(); 335 336 // Snapshot the stats when the message is received. 337 const uint64_t receive_time_ns = GetClockNs(); 338 sched_stats_.Update(); 339 340 // Use the RPC system to return the results. 341 RemoteMethodReturn<BenchmarkRPC::Stats>( 342 message, BenchmarkRPC::Stats::Return{receive_time_ns, GetClockNs(), 343 sched_stats_}); 344 return {}; 345 } 346 347 case BenchmarkOps::WriteVector: 348 VLOG(1) << "BenchmarkService::HandleMessage: op=writevec send_length=" 349 << message.GetSendLength() 350 << " receive_length=" << message.GetReceiveLength(); 351 352 DispatchRemoteMethod<BenchmarkRPC::WriteVector>( 353 *this, &BenchmarkService::OnWriteVector, message, kMaxMessageSize); 354 return {}; 355 356 case BenchmarkOps::EchoVector: 357 VLOG(1) << "BenchmarkService::HandleMessage: op=echovec send_length=" 358 << message.GetSendLength() 359 << " receive_length=" << message.GetReceiveLength(); 360 361 DispatchRemoteMethod<BenchmarkRPC::EchoVector>( 362 *this, &BenchmarkService::OnEchoVector, message, kMaxMessageSize); 363 return {}; 364 365 case BenchmarkOps::Quit: 366 Cancel(); 367 return ErrorStatus{ESHUTDOWN}; 368 369 default: 370 VLOG(1) << "BenchmarkService::HandleMessage: default case; op=" 371 << message.GetOp(); 372 return Service::DefaultHandleMessage(message); 373 } 374 } 375 376 // Updates the scheduler stats from procfs for this thread. 377 void UpdateSchedStats() { sched_stats_.Update(); } 378 379 private: 380 friend BASE; 381 382 BenchmarkService(std::unique_ptr<Endpoint> endpoint) 383 : BASE("BenchmarkService", std::move(endpoint)), 384 send_buffer(kMaxMessageSize), 385 receive_buffer(kMaxMessageSize) {} 386 387 std::vector<uint8_t> send_buffer; 388 std::vector<uint8_t> receive_buffer; 389 390 // Each service thread has its own scheduler stats object. 391 static thread_local SchedStats sched_stats_; 392 393 using BufferType = BufferWrapper< 394 std::vector<uint8_t, DefaultInitializationAllocator<uint8_t>>>; 395 396 int OnWriteVector(Message&, const BufferType& data) { return data.size(); } 397 BufferType OnEchoVector(Message&, BufferType&& data) { 398 return std::move(data); 399 } 400 401 BenchmarkService(const BenchmarkService&) = delete; 402 void operator=(const BenchmarkService&) = delete; 403 }; 404 405 thread_local SchedStats BenchmarkService::sched_stats_; 406 407 // Implements the client side of the benchmark. 408 class BenchmarkClient : public ClientBase<BenchmarkClient> { 409 public: 410 int Nop() { 411 ATRACE_NAME("BenchmarkClient::Nop"); 412 VLOG(1) << "BenchmarkClient::Nop"; 413 Transaction transaction{*this}; 414 return ReturnStatusOrError(transaction.Send<int>(BenchmarkOps::Nop)); 415 } 416 417 int Write(const void* buffer, size_t length) { 418 ATRACE_NAME("BenchmarkClient::Write"); 419 VLOG(1) << "BenchmarkClient::Write: buffer=" << buffer 420 << " length=" << length; 421 Transaction transaction{*this}; 422 return ReturnStatusOrError( 423 transaction.Send<int>(BenchmarkOps::Write, buffer, length, nullptr, 0)); 424 // return write(endpoint_fd(), buffer, length); 425 } 426 427 int Read(void* buffer, size_t length) { 428 ATRACE_NAME("BenchmarkClient::Read"); 429 VLOG(1) << "BenchmarkClient::Read: buffer=" << buffer 430 << " length=" << length; 431 Transaction transaction{*this}; 432 return ReturnStatusOrError( 433 transaction.Send<int>(BenchmarkOps::Read, nullptr, 0, buffer, length)); 434 // return read(endpoint_fd(), buffer, length); 435 } 436 437 int Echo(const void* send_buffer, size_t send_length, void* receive_buffer, 438 size_t receive_length) { 439 ATRACE_NAME("BenchmarkClient::Echo"); 440 VLOG(1) << "BenchmarkClient::Echo: send_buffer=" << send_buffer 441 << " send_length=" << send_length 442 << " receive_buffer=" << receive_buffer 443 << " receive_length=" << receive_length; 444 Transaction transaction{*this}; 445 return ReturnStatusOrError( 446 transaction.Send<int>(BenchmarkOps::Echo, send_buffer, send_length, 447 receive_buffer, receive_length)); 448 } 449 450 int Stats(std::tuple<uint64_t, uint64_t, SchedStats>* stats_out) { 451 ATRACE_NAME("BenchmarkClient::Stats"); 452 VLOG(1) << "BenchmarkClient::Stats"; 453 454 auto status = InvokeRemoteMethodInPlace<BenchmarkRPC::Stats>(stats_out); 455 return status ? 0 : -status.error(); 456 } 457 458 int WriteVector(const BufferWrapper<std::vector<uint8_t>>& data) { 459 ATRACE_NAME("BenchmarkClient::Stats"); 460 VLOG(1) << "BenchmarkClient::Stats"; 461 462 auto status = InvokeRemoteMethod<BenchmarkRPC::WriteVector>(data); 463 return ReturnStatusOrError(status); 464 } 465 466 template <typename T> 467 int WriteVector(const BufferWrapper<T>& data) { 468 ATRACE_NAME("BenchmarkClient::WriteVector"); 469 VLOG(1) << "BenchmarkClient::WriteVector"; 470 471 auto status = InvokeRemoteMethod<BenchmarkRPC::WriteVector>(data); 472 return ReturnStatusOrError(status); 473 } 474 475 template <typename T, typename U> 476 int EchoVector(const BufferWrapper<T>& data, BufferWrapper<U>* data_out) { 477 ATRACE_NAME("BenchmarkClient::EchoVector"); 478 VLOG(1) << "BenchmarkClient::EchoVector"; 479 480 MessageBuffer<ReplyBuffer>::Reserve(kMaxMessageSize - 1); 481 auto status = 482 InvokeRemoteMethodInPlace<BenchmarkRPC::EchoVector>(data_out, data); 483 return status ? 0 : -status.error(); 484 } 485 486 int Quit() { 487 VLOG(1) << "BenchmarkClient::Quit"; 488 Transaction transaction{*this}; 489 return ReturnStatusOrError(transaction.Send<int>(BenchmarkOps::Echo)); 490 } 491 492 private: 493 friend BASE; 494 495 BenchmarkClient(const std::string& service_path) 496 : BASE(ClientChannelFactory::Create(service_path), 497 ProgramOptions.timeout) {} 498 499 BenchmarkClient(const BenchmarkClient&) = delete; 500 void operator=(const BenchmarkClient&) = delete; 501 }; 502 503 // Creates a benchmark service at |path| and dispatches messages. 504 int ServiceCommand(const std::string& path) { 505 if (path.empty()) 506 return -EINVAL; 507 508 // Start the requested number of dispatch threads. 509 std::vector<std::thread> dispatch_threads; 510 int service_count = ProgramOptions.instances; 511 int service_id_counter = 0; 512 int thread_id_counter = 0; 513 std::atomic<bool> done(false); 514 515 while (service_count--) { 516 std::cerr << "Starting service instance " << service_id_counter 517 << std::endl; 518 auto service = BenchmarkService::Create( 519 android::pdx::default_transport::Endpoint::CreateAndBindSocket( 520 GetServicePath(path, service_id_counter), 521 android::pdx::default_transport::Endpoint::kBlocking)); 522 if (!service) { 523 std::cerr << "Failed to create service instance!!" << std::endl; 524 done = true; 525 break; 526 } 527 528 int thread_count = ProgramOptions.threads; 529 while (thread_count--) { 530 std::cerr << "Starting dispatch thread " << thread_id_counter 531 << " service " << service_id_counter << std::endl; 532 533 dispatch_threads.emplace_back( 534 [&](const int thread_id, const int service_id, 535 const std::shared_ptr<BenchmarkService>& local_service) { 536 SetThreadName("service" + std::to_string(service_id)); 537 538 // Read the initial schedstats for this thread from procfs. 539 local_service->UpdateSchedStats(); 540 541 ATRACE_NAME("BenchmarkService::Dispatch"); 542 while (!done) { 543 auto ret = local_service->ReceiveAndDispatch(); 544 if (!ret) { 545 if (ret.error() != ESHUTDOWN) { 546 std::cerr << "Error while dispatching message on thread " 547 << thread_id << " service " << service_id << ": " 548 << ret.GetErrorMessage() << std::endl; 549 } else { 550 std::cerr << "Quitting thread " << thread_id << " service " 551 << service_id << std::endl; 552 } 553 done = true; 554 return; 555 } 556 } 557 }, 558 thread_id_counter++, service_id_counter, service); 559 } 560 561 service_id_counter++; 562 } 563 564 // Wait for the dispatch threads to exit. 565 for (auto& thread : dispatch_threads) { 566 thread.join(); 567 } 568 569 return 0; 570 } 571 572 int ClientCommand(const std::string& path) { 573 // Start the requested number of client threads. 574 std::vector<std::thread> client_threads; 575 std::vector<std::future<BenchmarkResult>> client_results; 576 int service_count = ProgramOptions.instances; 577 int thread_id_counter = 0; 578 int service_id_counter = 0; 579 580 // Aggregate statistics, updated when worker threads exit. 581 std::atomic<uint64_t> total_bytes(0); 582 std::atomic<uint64_t> total_time_ns(0); 583 584 // Samples for variance calculation. 585 std::vector<uint64_t> latency_samples_ns( 586 ProgramOptions.instances * ProgramOptions.threads * ProgramOptions.count); 587 const size_t samples_per_thread = ProgramOptions.count; 588 589 std::vector<uint8_t> send_buffer(ProgramOptions.blocksize); 590 std::vector<uint8_t> receive_buffer(kMaxMessageSize); 591 592 // Barriers for synchronizing thread start. 593 std::vector<std::future<void>> ready_barrier_futures; 594 std::promise<void> go_barrier_promise; 595 std::future<void> go_barrier_future = go_barrier_promise.get_future(); 596 597 // Barrier for synchronizing thread tear down. 598 std::promise<void> done_barrier_promise; 599 std::future<void> done_barrier_future = done_barrier_promise.get_future(); 600 601 while (service_count--) { 602 int thread_count = ProgramOptions.threads; 603 while (thread_count--) { 604 std::cerr << "Starting client thread " << thread_id_counter << " service " 605 << service_id_counter << std::endl; 606 607 std::promise<BenchmarkResult> result_promise; 608 client_results.push_back(result_promise.get_future()); 609 610 std::promise<void> ready_barrier_promise; 611 ready_barrier_futures.push_back(ready_barrier_promise.get_future()); 612 613 client_threads.emplace_back( 614 [&](const int thread_id, const int service_id, 615 std::promise<BenchmarkResult> result, std::promise<void> ready) { 616 SetThreadName("client" + std::to_string(thread_id) + "/" + 617 std::to_string(service_id)); 618 619 ATRACE_NAME("BenchmarkClient::Dispatch"); 620 621 auto client = 622 BenchmarkClient::Create(GetServicePath(path, service_id)); 623 if (!client) { 624 std::cerr << "Failed to create client for service " << service_id 625 << std::endl; 626 return -ENOMEM; 627 } 628 629 uint64_t* thread_samples = 630 &latency_samples_ns[samples_per_thread * thread_id]; 631 632 // Per-thread statistics. 633 uint64_t bytes_sent = 0; 634 uint64_t time_start_ns; 635 uint64_t time_end_ns; 636 SchedStats sched_stats; 637 638 // Signal ready and wait for go. 639 ready.set_value(); 640 go_barrier_future.wait(); 641 642 // Warmup the scheduler. 643 int warmup = ProgramOptions.warmup; 644 while (warmup--) { 645 for (int i = 0; i < 1000000; i++) 646 ; 647 } 648 649 sched_stats.Update(); 650 time_start_ns = GetClockNs(); 651 652 int count = ProgramOptions.count; 653 while (count--) { 654 uint64_t iteration_start_ns = GetClockNs(); 655 656 switch (ProgramOptions.opcode) { 657 case BenchmarkOps::Nop: { 658 const int ret = client->Nop(); 659 if (ret < 0) { 660 std::cerr << "Failed to send nop: " << strerror(-ret) 661 << std::endl; 662 return ret; 663 } else { 664 VLOG(1) << "Success"; 665 } 666 break; 667 } 668 669 case BenchmarkOps::Read: { 670 const int ret = client->Read(receive_buffer.data(), 671 ProgramOptions.blocksize); 672 if (ret < 0) { 673 std::cerr << "Failed to read: " << strerror(-ret) 674 << std::endl; 675 return ret; 676 } else if (ret != ProgramOptions.blocksize) { 677 std::cerr << "Expected ret=" << ProgramOptions.blocksize 678 << "; actual ret=" << ret << std::endl; 679 return -EINVAL; 680 } else { 681 VLOG(1) << "Success"; 682 bytes_sent += ret; 683 } 684 break; 685 } 686 687 case BenchmarkOps::Write: { 688 const int ret = 689 client->Write(send_buffer.data(), send_buffer.size()); 690 if (ret < 0) { 691 std::cerr << "Failed to write: " << strerror(-ret) 692 << std::endl; 693 return ret; 694 } else if (ret != ProgramOptions.blocksize) { 695 std::cerr << "Expected ret=" << ProgramOptions.blocksize 696 << "; actual ret=" << ret << std::endl; 697 return -EINVAL; 698 } else { 699 VLOG(1) << "Success"; 700 bytes_sent += ret; 701 } 702 break; 703 } 704 705 case BenchmarkOps::Echo: { 706 const int ret = client->Echo( 707 send_buffer.data(), send_buffer.size(), 708 receive_buffer.data(), receive_buffer.size()); 709 if (ret < 0) { 710 std::cerr << "Failed to echo: " << strerror(-ret) 711 << std::endl; 712 return ret; 713 } else if (ret != ProgramOptions.blocksize) { 714 std::cerr << "Expected ret=" << ProgramOptions.blocksize 715 << "; actual ret=" << ret << std::endl; 716 return -EINVAL; 717 } else { 718 VLOG(1) << "Success"; 719 bytes_sent += ret * 2; 720 } 721 break; 722 } 723 724 case BenchmarkOps::Stats: { 725 std::tuple<uint64_t, uint64_t, SchedStats> stats; 726 const int ret = client->Stats(&stats); 727 if (ret < 0) { 728 std::cerr << "Failed to get stats: " << strerror(-ret) 729 << std::endl; 730 return ret; 731 } else { 732 VLOG(1) << "Success"; 733 std::cerr 734 << "Round trip: receive_time_ns=" << std::get<0>(stats) 735 << " reply_time_ns=" << std::get<1>(stats) 736 << " cpu_time_s=" << std::get<2>(stats).cpu_time_s() 737 << " wait_s=" << std::get<2>(stats).wait_s() 738 << std::endl; 739 } 740 break; 741 } 742 743 case BenchmarkOps::WriteVector: { 744 const int ret = client->WriteVector( 745 WrapBuffer(send_buffer.data(), ProgramOptions.blocksize)); 746 if (ret < 0) { 747 std::cerr << "Failed to write vector: " << strerror(-ret) 748 << std::endl; 749 return ret; 750 } else { 751 VLOG(1) << "Success"; 752 bytes_sent += ret; 753 } 754 break; 755 } 756 757 case BenchmarkOps::EchoVector: { 758 thread_local BufferWrapper<std::vector< 759 uint8_t, DefaultInitializationAllocator<uint8_t>>> 760 response_buffer; 761 const int ret = client->EchoVector( 762 WrapBuffer(send_buffer.data(), ProgramOptions.blocksize), 763 &response_buffer); 764 if (ret < 0) { 765 std::cerr << "Failed to echo vector: " << strerror(-ret) 766 << std::endl; 767 return ret; 768 } else { 769 VLOG(1) << "Success"; 770 bytes_sent += send_buffer.size() + response_buffer.size(); 771 } 772 break; 773 } 774 775 case BenchmarkOps::Quit: { 776 const int ret = client->Quit(); 777 if (ret < 0 && ret != -ESHUTDOWN) { 778 std::cerr << "Failed to send quit: " << strerror(-ret); 779 return ret; 780 } else { 781 VLOG(1) << "Success"; 782 } 783 break; 784 } 785 786 default: 787 std::cerr 788 << "Invalid client operation: " << ProgramOptions.opcode 789 << std::endl; 790 return -EINVAL; 791 } 792 793 uint64_t iteration_end_ns = GetClockNs(); 794 uint64_t iteration_delta_ns = 795 iteration_end_ns - iteration_start_ns; 796 thread_samples[count] = iteration_delta_ns; 797 798 if (iteration_delta_ns > (kNanosPerSecond / 100)) { 799 SchedStats stats = sched_stats; 800 stats.Update(); 801 std::cerr << "Thread " << thread_id << " iteration_delta_s=" 802 << (static_cast<double>(iteration_delta_ns) / 803 kNanosPerSecond) 804 << " " << stats.cpu_time_s() << " " << stats.wait_s() 805 << std::endl; 806 } 807 } 808 809 time_end_ns = GetClockNs(); 810 sched_stats.Update(); 811 812 const double time_delta_s = 813 static_cast<double>(time_end_ns - time_start_ns) / 814 kNanosPerSecond; 815 816 total_bytes += bytes_sent; 817 total_time_ns += time_end_ns - time_start_ns; 818 819 result.set_value( 820 {thread_id, service_id, time_delta_s, bytes_sent, sched_stats}); 821 done_barrier_future.wait(); 822 823 return 0; 824 }, 825 thread_id_counter++, service_id_counter, std::move(result_promise), 826 std::move(ready_barrier_promise)); 827 } 828 829 service_id_counter++; 830 } 831 832 // Wait for workers to be ready. 833 std::cerr << "Waiting for workers to be ready..." << std::endl; 834 for (auto& ready : ready_barrier_futures) 835 ready.wait(); 836 837 // Signal workers to go. 838 std::cerr << "Kicking off benchmark." << std::endl; 839 go_barrier_promise.set_value(); 840 841 // Wait for all the worker threas to finish. 842 for (auto& result : client_results) 843 result.wait(); 844 845 // Report worker thread results. 846 for (auto& result : client_results) { 847 BenchmarkResult benchmark_result = result.get(); 848 std::cerr << std::fixed << "Thread " << benchmark_result.thread_id 849 << " service " << benchmark_result.service_id << ":" << std::endl; 850 std::cerr << "\t " << benchmark_result.bytes_sent << " bytes in " 851 << benchmark_result.time_delta_s << " seconds (" 852 << std::setprecision(0) << (benchmark_result.bytes_sent / 1024.0 / 853 benchmark_result.time_delta_s) 854 << " K/s; " << std::setprecision(3) 855 << (ProgramOptions.count / benchmark_result.time_delta_s) 856 << " txn/s; " << std::setprecision(9) 857 << (benchmark_result.time_delta_s / ProgramOptions.count) 858 << " s/txn)" << std::endl; 859 std::cerr << "\tStats: " << benchmark_result.sched_stats.cpu_time_s() << " " 860 << (benchmark_result.sched_stats.cpu_time_s() / 861 ProgramOptions.count) 862 << " " << benchmark_result.sched_stats.wait_s() << " " 863 << (benchmark_result.sched_stats.wait_s() / ProgramOptions.count) 864 << " " << benchmark_result.sched_stats.timeslices() << std::endl; 865 } 866 867 // Signal worker threads to exit. 868 done_barrier_promise.set_value(); 869 870 // Wait for the worker threads to exit. 871 for (auto& thread : client_threads) { 872 thread.join(); 873 } 874 875 // Report aggregate results. 876 const int total_threads = ProgramOptions.threads * ProgramOptions.instances; 877 const int iterations = ProgramOptions.count; 878 const double total_time_s = 879 static_cast<double>(total_time_ns) / kNanosPerSecond; 880 // This is about how much wall time it took to completely transfer all the 881 // paylaods. 882 const double average_time_s = total_time_s / total_threads; 883 884 const uint64_t min_sample_time_ns = 885 *std::min_element(latency_samples_ns.begin(), latency_samples_ns.end()); 886 const double min_sample_time_s = 887 static_cast<double>(min_sample_time_ns) / kNanosPerSecond; 888 889 const uint64_t max_sample_time_ns = 890 *std::max_element(latency_samples_ns.begin(), latency_samples_ns.end()); 891 const double max_sample_time_s = 892 static_cast<double>(max_sample_time_ns) / kNanosPerSecond; 893 894 const double total_sample_time_s = 895 std::accumulate(latency_samples_ns.begin(), latency_samples_ns.end(), 0.0, 896 [](double s, uint64_t ns) { 897 return s + static_cast<double>(ns) / kNanosPerSecond; 898 }); 899 const double average_sample_time_s = 900 total_sample_time_s / latency_samples_ns.size(); 901 902 const double sum_of_squared_deviations = std::accumulate( 903 latency_samples_ns.begin(), latency_samples_ns.end(), 0.0, 904 [&](double s, uint64_t ns) { 905 const double delta = 906 static_cast<double>(ns) / kNanosPerSecond - average_sample_time_s; 907 return s + delta * delta; 908 }); 909 const double variance = sum_of_squared_deviations / latency_samples_ns.size(); 910 const double standard_deviation = std::sqrt(variance); 911 912 const int num_buckets = 200; 913 const uint64_t sample_range_ns = max_sample_time_ns - min_sample_time_ns; 914 const uint64_t ns_per_bucket = sample_range_ns / num_buckets; 915 std::array<uint64_t, num_buckets> sample_buckets = {{0}}; 916 917 // Count samples in each bucket range. 918 for (uint64_t sample_ns : latency_samples_ns) { 919 sample_buckets[(sample_ns - min_sample_time_ns) / (ns_per_bucket + 1)] += 1; 920 } 921 922 // Calculate population percentiles. 923 const uint64_t percent_50 = 924 static_cast<uint64_t>(latency_samples_ns.size() * 0.5); 925 const uint64_t percent_90 = 926 static_cast<uint64_t>(latency_samples_ns.size() * 0.9); 927 const uint64_t percent_95 = 928 static_cast<uint64_t>(latency_samples_ns.size() * 0.95); 929 const uint64_t percent_99 = 930 static_cast<uint64_t>(latency_samples_ns.size() * 0.99); 931 932 uint64_t sample_count = 0; 933 double latency_50th_percentile_s, latency_90th_percentile_s, 934 latency_95th_percentile_s, latency_99th_percentile_s; 935 for (int i = 0; i < num_buckets; i++) { 936 // Report the midpoint of the bucket range as the value of the 937 // corresponding 938 // percentile. 939 const double bucket_midpoint_time_s = 940 (ns_per_bucket * i + 0.5 * ns_per_bucket + min_sample_time_ns) / 941 kNanosPerSecond; 942 if (sample_count < percent_50 && 943 (sample_count + sample_buckets[i]) >= percent_50) { 944 latency_50th_percentile_s = bucket_midpoint_time_s; 945 } 946 if (sample_count < percent_90 && 947 (sample_count + sample_buckets[i]) >= percent_90) { 948 latency_90th_percentile_s = bucket_midpoint_time_s; 949 } 950 if (sample_count < percent_95 && 951 (sample_count + sample_buckets[i]) >= percent_95) { 952 latency_95th_percentile_s = bucket_midpoint_time_s; 953 } 954 if (sample_count < percent_99 && 955 (sample_count + sample_buckets[i]) >= percent_99) { 956 latency_99th_percentile_s = bucket_midpoint_time_s; 957 } 958 sample_count += sample_buckets[i]; 959 } 960 961 std::cerr << std::fixed << "Total throughput over " << total_threads 962 << " threads:\n\t " << total_bytes << " bytes in " << average_time_s 963 << " seconds (" << std::setprecision(0) 964 << (total_bytes / 1024.0 / average_time_s) << " K/s; " 965 << std::setprecision(3) 966 << (iterations * total_threads / average_time_s) 967 << std::setprecision(9) << " txn/s; " 968 << (average_time_s / (iterations * total_threads)) << " s/txn)" 969 << std::endl; 970 std::cerr << "Sample statistics: " << std::endl; 971 std::cerr << total_sample_time_s << " s total sample time" << std::endl; 972 std::cerr << average_sample_time_s << " s avg" << std::endl; 973 std::cerr << standard_deviation << " s std dev" << std::endl; 974 std::cerr << min_sample_time_s << " s min" << std::endl; 975 std::cerr << max_sample_time_s << " s max" << std::endl; 976 std::cerr << "Latency percentiles:" << std::endl; 977 std::cerr << "50th: " << latency_50th_percentile_s << " s" << std::endl; 978 std::cerr << "90th: " << latency_90th_percentile_s << " s" << std::endl; 979 std::cerr << "95th: " << latency_95th_percentile_s << " s" << std::endl; 980 std::cerr << "99th: " << latency_99th_percentile_s << " s" << std::endl; 981 982 std::cout << total_time_ns << " " << std::fixed << std::setprecision(9) 983 << average_sample_time_s << " " << std::fixed 984 << std::setprecision(9) << standard_deviation << std::endl; 985 return 0; 986 } 987 988 int Usage(const std::string& command_name) { 989 // clang-format off 990 std::cout << "Usage: " << command_name << " [options]" << std::endl; 991 std::cout << "\t--verbose : Use verbose messages." << std::endl; 992 std::cout << "\t--service <endpoint path> : Start service at the given path." << std::endl; 993 std::cout << "\t--client <endpoint path> : Start client to the given path." << std::endl; 994 std::cout << "\t--op <read | write | echo> : Sepcify client operation mode." << std::endl; 995 std::cout << "\t--bs <block size bytes> : Sepcify block size to use." << std::endl; 996 std::cout << "\t--count <count> : Sepcify number of transactions to make." << std::endl; 997 std::cout << "\t--instances <count> : Specify number of service instances." << std::endl; 998 std::cout << "\t--threads <count> : Sepcify number of threads per instance." << std::endl; 999 std::cout << "\t--timeout <timeout ms | -1> : Timeout to wait for services." << std::endl; 1000 std::cout << "\t--trace : Enable systrace logging." << std::endl; 1001 std::cout << "\t--warmup <iterations> : Busy loops before running benchmarks." << std::endl; 1002 // clang-format on 1003 return -1; 1004 } 1005 1006 } // anonymous namespace 1007 1008 int main(int argc, char** argv) { 1009 logging::LoggingSettings logging_settings; 1010 logging_settings.logging_dest = logging::LOG_TO_SYSTEM_DEBUG_LOG; 1011 logging::InitLogging(logging_settings); 1012 1013 int getopt_code; 1014 int option_index; 1015 std::string option = ""; 1016 std::string command = ""; 1017 std::string command_argument = ""; 1018 bool tracing_enabled = false; 1019 1020 // Process command line options. 1021 while ((getopt_code = 1022 getopt_long(argc, argv, "", long_options, &option_index)) != -1) { 1023 option = long_options[option_index].name; 1024 VLOG(1) << "option=" << option; 1025 switch (getopt_code) { 1026 case 0: 1027 if (option == kOptionVerbose) { 1028 ProgramOptions.verbose = true; 1029 logging::SetMinLogLevel(-1); 1030 } else if (option == kOptionOpcode) { 1031 ParseOpcodeOption(optarg); 1032 } else if (option == kOptionBlocksize) { 1033 ProgramOptions.blocksize = std::stoi(optarg); 1034 if (ProgramOptions.blocksize < 0) { 1035 std::cerr << "Invalid blocksize argument: " 1036 << ProgramOptions.blocksize << std::endl; 1037 return -EINVAL; 1038 } 1039 } else if (option == kOptionCount) { 1040 ProgramOptions.count = std::stoi(optarg); 1041 if (ProgramOptions.count < 1) { 1042 std::cerr << "Invalid count argument: " << ProgramOptions.count 1043 << std::endl; 1044 return -EINVAL; 1045 } 1046 } else if (option == kOptionThreads) { 1047 ProgramOptions.threads = std::stoi(optarg); 1048 if (ProgramOptions.threads < 1) { 1049 std::cerr << "Invalid threads argument: " << ProgramOptions.threads 1050 << std::endl; 1051 return -EINVAL; 1052 } 1053 } else if (option == kOptionInstances) { 1054 ProgramOptions.instances = std::stoi(optarg); 1055 if (ProgramOptions.instances < 1) { 1056 std::cerr << "Invalid instances argument: " 1057 << ProgramOptions.instances << std::endl; 1058 return -EINVAL; 1059 } 1060 } else if (option == kOptionTimeout) { 1061 ProgramOptions.timeout = std::stoi(optarg); 1062 } else if (option == kOptionTrace) { 1063 tracing_enabled = true; 1064 } else if (option == kOptionWarmup) { 1065 ProgramOptions.warmup = std::stoi(optarg); 1066 } else { 1067 command = option; 1068 if (optarg) 1069 command_argument = optarg; 1070 } 1071 break; 1072 } 1073 } 1074 1075 // Setup ATRACE/systrace based on command line. 1076 atrace_setup(); 1077 atrace_set_tracing_enabled(tracing_enabled); 1078 1079 VLOG(1) << "command=" << command << " command_argument=" << command_argument; 1080 1081 if (command == "") { 1082 return Usage(argv[0]); 1083 } else if (command == kOptionService) { 1084 return ServiceCommand(command_argument); 1085 } else if (command == kOptionClient) { 1086 return ClientCommand(command_argument); 1087 } else { 1088 return Usage(argv[0]); 1089 } 1090 } 1091