1 #!/usr/bin/env ruby 2 3 # Copyright 2016 gRPC authors. 4 # 5 # Licensed under the Apache License, Version 2.0 (the "License"); 6 # you may not use this file except in compliance with the License. 7 # You may obtain a copy of the License at 8 # 9 # http://www.apache.org/licenses/LICENSE-2.0 10 # 11 # Unless required by applicable law or agreed to in writing, software 12 # distributed under the License is distributed on an "AS IS" BASIS, 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 # See the License for the specific language governing permissions and 15 # limitations under the License. 16 17 # Worker and worker service implementation 18 19 this_dir = File.expand_path(File.dirname(__FILE__)) 20 lib_dir = File.join(File.dirname(this_dir), 'lib') 21 $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir) 22 $LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir) 23 24 require 'grpc' 25 require 'optparse' 26 require 'histogram' 27 require 'etc' 28 require 'facter' 29 require 'client' 30 require 'qps-common' 31 require 'server' 32 require 'src/proto/grpc/testing/worker_service_services_pb' 33 34 class WorkerServiceImpl < Grpc::Testing::WorkerService::Service 35 def cpu_cores 36 Facter.value('processors')['count'] 37 end 38 def run_server(reqs) 39 q = EnumeratorQueue.new(self) 40 Thread.new { 41 bms = '' 42 gtss = Grpc::Testing::ServerStatus 43 reqs.each do |req| 44 case req.argtype.to_s 45 when 'setup' 46 bms = BenchmarkServer.new(req.setup, @server_port) 47 q.push(gtss.new(stats: bms.mark(false), port: bms.get_port)) 48 when 'mark' 49 q.push(gtss.new(stats: bms.mark(req.mark.reset), cores: cpu_cores)) 50 end 51 end 52 bms.stop 53 q.push(self) 54 } 55 q.each_item 56 end 57 def run_client(reqs) 58 q = EnumeratorQueue.new(self) 59 Thread.new { 60 client = '' 61 reqs.each do |req| 62 case req.argtype.to_s 63 when 'setup' 64 client = BenchmarkClient.new(req.setup) 65 q.push(Grpc::Testing::ClientStatus.new(stats: client.mark(false))) 66 when 'mark' 67 q.push(Grpc::Testing::ClientStatus.new(stats: 68 client.mark(req.mark.reset))) 69 end 70 end 71 client.shutdown 72 q.push(self) 73 } 74 q.each_item 75 end 76 def core_count(_args, _call) 77 Grpc::Testing::CoreResponse.new(cores: cpu_cores) 78 end 79 def quit_worker(_args, _call) 80 @shutdown_thread = Thread.new { 81 @server.stop 82 } 83 Grpc::Testing::Void.new 84 end 85 def initialize(s, sp) 86 @server = s 87 @server_port = sp 88 end 89 def join_shutdown_thread 90 @shutdown_thread.join 91 end 92 end 93 94 def main 95 options = { 96 'driver_port' => 0, 97 'server_port' => 0 98 } 99 OptionParser.new do |opts| 100 opts.banner = 'Usage: [--driver_port <port>] [--server_port <port>]' 101 opts.on('--driver_port PORT', '<port>') do |v| 102 options['driver_port'] = v 103 end 104 opts.on('--server_port PORT', '<port>') do |v| 105 options['server_port'] = v 106 end 107 end.parse! 108 109 # Configure any errors with client or server child threads to surface 110 Thread.abort_on_exception = true 111 112 s = GRPC::RpcServer.new(poll_period: 3) 113 s.add_http2_port("0.0.0.0:" + options['driver_port'].to_s, 114 :this_port_is_insecure) 115 worker_service = WorkerServiceImpl.new(s, options['server_port'].to_i) 116 s.handle(worker_service) 117 s.run 118 worker_service.join_shutdown_thread 119 end 120 121 main 122