Home | History | Annotate | Download | only in qps
      1 #!/usr/bin/env ruby
      2 
      3 # Copyright 2017 gRPC authors.
      4 #
      5 # Licensed under the Apache License, Version 2.0 (the "License");
      6 # you may not use this file except in compliance with the License.
      7 # You may obtain a copy of the License at
      8 #
      9 #     http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS,
     13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 # See the License for the specific language governing permissions and
     15 # limitations under the License.
     16 
     17 # Proxy of worker service implementation for running a PHP client
     18 
     19 this_dir = File.expand_path(File.dirname(__FILE__))
     20 lib_dir = File.join(File.dirname(this_dir), 'lib')
     21 $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
     22 $LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir)
     23 
     24 require 'grpc'
     25 require 'optparse'
     26 require 'histogram'
     27 require 'etc'
     28 require 'facter'
     29 require 'qps-common'
     30 require 'src/proto/grpc/testing/worker_service_services_pb'
     31 require 'src/proto/grpc/testing/proxy-service_services_pb'
     32 
     33 class ProxyBenchmarkClientServiceImpl < Grpc::Testing::ProxyClientService::Service
     34   def initialize(port, c_ext, php_client_bin)
     35     @mytarget = "localhost:" + port.to_s
     36     @use_c_ext = c_ext
     37     @php_client_bin = php_client_bin
     38   end
     39   def setup(config)
     40     @config = config
     41     @histres = config.histogram_params.resolution
     42     @histmax = config.histogram_params.max_possible
     43     @histogram = Histogram.new(@histres, @histmax)
     44     @start_time = Time.now
     45     @php_pid = Array.new(@config.client_channels)
     46     (0..@config.client_channels-1).each do |chan|
     47       Thread.new {
     48         if @use_c_ext
     49           puts "Use protobuf c extension"
     50           command = "php -d extension=" + File.expand_path(File.dirname(__FILE__)) +
     51             "/../../php/tests/qps/vendor/google/protobuf/php/ext/google/protobuf/modules/protobuf.so " +
     52             "-d extension=" + File.expand_path(File.dirname(__FILE__)) + "/../../php/ext/grpc/modules/grpc.so " +
     53             File.expand_path(File.dirname(__FILE__)) + "/" + @php_client_bin + " " + @mytarget + " #{chan%@config.server_targets.length}"
     54         else
     55           puts "Use protobuf php extension"
     56           command = "php -d extension=" + File.expand_path(File.dirname(__FILE__)) + "/../../php/ext/grpc/modules/grpc.so " +
     57             File.expand_path(File.dirname(__FILE__)) + "/" + @php_client_bin + " " + @mytarget + " #{chan%@config.server_targets.length}"
     58         end
     59         puts "[ruby proxy] Starting #{chan}th php-client command use c protobuf #{@use_c_ext}: " + command
     60         @php_pid[chan] = spawn(command)
     61         while true
     62           sleep
     63         end
     64       }
     65     end
     66   end
     67   def stop
     68     (0..@config.client_channels-1).each do |chan|
     69       Process.kill("TERM", @php_pid[chan])
     70       Process.wait(@php_pid[chan])
     71     end
     72   end
     73   def get_config(_args, _call)
     74     @config
     75   end
     76   def report_time(call)
     77     call.each_remote_read do |lat|
     78       @histogram.add((lat.latency)*1e9)
     79     end
     80     Grpc::Testing::Void.new
     81   end
     82   def report_hist(call)
     83     call.each_remote_read do |lat|
     84       @histogram.merge(lat)
     85     end
     86     Grpc::Testing::Void.new
     87   end
     88   def mark(reset)
     89     lat = Grpc::Testing::HistogramData.new(
     90       bucket: @histogram.contents,
     91       min_seen: @histogram.minimum,
     92       max_seen: @histogram.maximum,
     93       sum: @histogram.sum,
     94       sum_of_squares: @histogram.sum_of_squares,
     95       count: @histogram.count
     96     )
     97     elapsed = Time.now-@start_time
     98     if reset
     99       @start_time = Time.now
    100       @histogram = Histogram.new(@histres, @histmax)
    101     end
    102     Grpc::Testing::ClientStats.new(latencies: lat, time_elapsed: elapsed)
    103   end
    104 end
    105 
    106 class ProxyWorkerServiceImpl < Grpc::Testing::WorkerService::Service
    107   def cpu_cores
    108     Facter.value('processors')['count']
    109   end
    110   # Leave run_server unimplemented since this proxies for a client only.
    111   # If the driver tries to use this as a server, it will get an unimplemented
    112   # status return value.
    113   def run_client(reqs)
    114     q = EnumeratorQueue.new(self)
    115     Thread.new {
    116       reqs.each do |req|
    117         case req.argtype.to_s
    118         when 'setup'
    119           @bmc.setup(req.setup)
    120           q.push(Grpc::Testing::ClientStatus.new(stats: @bmc.mark(false)))
    121         when 'mark'
    122           q.push(Grpc::Testing::ClientStatus.new(stats:
    123                                                    @bmc.mark(req.mark.reset)))
    124         end
    125       end
    126       @bmc.stop
    127       q.push(self)
    128     }
    129     q.each_item
    130   end
    131   def core_count(_args, _call)
    132     Grpc::Testing::CoreResponse.new(cores: cpu_cores)
    133   end
    134   def quit_worker(_args, _call)
    135     Thread.new {
    136       sleep 3
    137       @server.stop
    138     }
    139     Grpc::Testing::Void.new
    140   end
    141   def initialize(s, bmc)
    142     @server = s
    143     @bmc = bmc
    144   end
    145 end
    146 
    147 def proxymain
    148   options = {
    149     'driver_port' => 0,
    150     'php_client_bin' => '../../php/tests/qps/client.php'
    151   }
    152   OptionParser.new do |opts|
    153     opts.banner = 'Usage: [--driver_port <port>]'
    154     opts.on('--driver_port PORT', '<port>') do |v|
    155       options['driver_port'] = v
    156     end
    157     opts.on("-c", "--[no-]use_protobuf_c_extension", "Use protobuf C-extention") do |c|
    158       options[:c_ext] = c
    159     end
    160     opts.on("-b", "--php_client_bin [FILE]",
    161       "PHP client to execute; path relative to this script") do |c|
    162       options['php_client_bin'] = c
    163     end
    164   end.parse!
    165 
    166   # Configure any errors with client or server child threads to surface
    167   Thread.abort_on_exception = true
    168 
    169   # Make sure proxy_server can handle the large number of calls in benchmarks
    170   s = GRPC::RpcServer.new(pool_size: 1024)
    171   port = s.add_http2_port("0.0.0.0:" + options['driver_port'].to_s,
    172                           :this_port_is_insecure)
    173   bmc = ProxyBenchmarkClientServiceImpl.new(port, options[:c_ext], options['php_client_bin'])
    174   s.handle(bmc)
    175   s.handle(ProxyWorkerServiceImpl.new(s, bmc))
    176   s.run
    177 end
    178 
    179 proxymain
    180