Home | History | Annotate | Download | only in qps
      1 #!/usr/bin/env ruby
      2 
      3 # Copyright 2016 gRPC authors.
      4 #
      5 # Licensed under the Apache License, Version 2.0 (the "License");
      6 # you may not use this file except in compliance with the License.
      7 # You may obtain a copy of the License at
      8 #
      9 #     http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS,
     13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 # See the License for the specific language governing permissions and
     15 # limitations under the License.
     16 
     17 # Worker and worker service implementation
     18 
     19 this_dir = File.expand_path(File.dirname(__FILE__))
     20 lib_dir = File.join(File.dirname(this_dir), 'lib')
     21 $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
     22 $LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir)
     23 
     24 require 'grpc'
     25 require 'optparse'
     26 require 'histogram'
     27 require 'etc'
     28 require 'facter'
     29 require 'client'
     30 require 'qps-common'
     31 require 'server'
     32 require 'src/proto/grpc/testing/worker_service_services_pb'
     33 
     34 class WorkerServiceImpl < Grpc::Testing::WorkerService::Service
     35   def cpu_cores
     36     Facter.value('processors')['count']
     37   end
     38   def run_server(reqs)
     39     q = EnumeratorQueue.new(self)
     40     Thread.new {
     41       bms = ''
     42       gtss = Grpc::Testing::ServerStatus
     43       reqs.each do |req|
     44         case req.argtype.to_s
     45         when 'setup'
     46           bms = BenchmarkServer.new(req.setup, @server_port)
     47           q.push(gtss.new(stats: bms.mark(false), port: bms.get_port))
     48         when 'mark'
     49           q.push(gtss.new(stats: bms.mark(req.mark.reset), cores: cpu_cores))
     50         end
     51       end
     52       bms.stop
     53       q.push(self)
     54     }
     55     q.each_item
     56   end
     57   def run_client(reqs)
     58     q = EnumeratorQueue.new(self)
     59     Thread.new {
     60       client = ''
     61       reqs.each do |req|
     62         case req.argtype.to_s
     63         when 'setup'
     64           client = BenchmarkClient.new(req.setup)
     65           q.push(Grpc::Testing::ClientStatus.new(stats: client.mark(false)))
     66         when 'mark'
     67           q.push(Grpc::Testing::ClientStatus.new(stats:
     68                                                    client.mark(req.mark.reset)))
     69         end
     70       end
     71       client.shutdown
     72       q.push(self)
     73     }
     74     q.each_item
     75   end
     76   def core_count(_args, _call)
     77     Grpc::Testing::CoreResponse.new(cores: cpu_cores)
     78   end
     79   def quit_worker(_args, _call)
     80     @shutdown_thread = Thread.new {
     81       @server.stop
     82     }
     83     Grpc::Testing::Void.new
     84   end
     85   def initialize(s, sp)
     86     @server = s
     87     @server_port = sp
     88   end
     89   def join_shutdown_thread
     90     @shutdown_thread.join
     91   end
     92 end
     93 
     94 def main
     95   options = {
     96     'driver_port' => 0,
     97     'server_port' => 0
     98   }
     99   OptionParser.new do |opts|
    100     opts.banner = 'Usage: [--driver_port <port>] [--server_port <port>]'
    101     opts.on('--driver_port PORT', '<port>') do |v|
    102       options['driver_port'] = v
    103     end
    104     opts.on('--server_port PORT', '<port>') do |v|
    105       options['server_port'] = v
    106     end
    107   end.parse!
    108 
    109   # Configure any errors with client or server child threads to surface
    110   Thread.abort_on_exception = true
    111   
    112   s = GRPC::RpcServer.new(poll_period: 3)
    113   s.add_http2_port("0.0.0.0:" + options['driver_port'].to_s,
    114                    :this_port_is_insecure)
    115   worker_service = WorkerServiceImpl.new(s, options['server_port'].to_i)
    116   s.handle(worker_service)
    117   s.run
    118   worker_service.join_shutdown_thread
    119 end
    120 
    121 main
    122