Home | History | Annotate | Download | only in test
      1 #!/usr/bin/env ruby
      2 
      3 # Copyright 2015 gRPC authors.
      4 #
      5 # Licensed under the Apache License, Version 2.0 (the "License");
      6 # you may not use this file except in compliance with the License.
      7 # You may obtain a copy of the License at
      8 #
      9 #     http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS,
     13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 # See the License for the specific language governing permissions and
     15 # limitations under the License.
     16 
     17 # client is a testing tool that accesses a gRPC interop testing server and runs
     18 # a test on it.
     19 #
     20 # Helps validate interoperation b/w different gRPC implementations.
     21 #
     22 # Usage: $ path/to/client.rb --server_host=<hostname> \
     23 #                            --server_port=<port> \
     24 #                            --test_case=<testcase_name>
     25 
     26 # These lines are required for the generated files to load grpc
     27 this_dir = File.expand_path(File.dirname(__FILE__))
     28 lib_dir = File.join(File.dirname(File.dirname(this_dir)), 'lib')
     29 pb_dir = File.dirname(this_dir)
     30 $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
     31 $LOAD_PATH.unshift(pb_dir) unless $LOAD_PATH.include?(pb_dir)
     32 
     33 require 'optparse'
     34 require 'logger'
     35 
     36 require_relative '../../lib/grpc'
     37 require 'googleauth'
     38 require 'google/protobuf'
     39 
     40 require_relative '../src/proto/grpc/testing/empty_pb'
     41 require_relative '../src/proto/grpc/testing/messages_pb'
     42 require_relative '../src/proto/grpc/testing/test_services_pb'
     43 
     44 AUTH_ENV = Google::Auth::CredentialsLoader::ENV_VAR
     45 
     46 # RubyLogger defines a logger for gRPC based on the standard ruby logger.
     47 module RubyLogger
     48   def logger
     49     LOGGER
     50   end
     51 
     52   LOGGER = Logger.new(STDOUT)
     53   LOGGER.level = Logger::INFO
     54 end
     55 
     56 # GRPC is the general RPC module
     57 module GRPC
     58   # Inject the noop #logger if no module-level logger method has been injected.
     59   extend RubyLogger
     60 end
     61 
     62 # AssertionError is use to indicate interop test failures.
     63 class AssertionError < RuntimeError; end
     64 
     65 # Fails with AssertionError if the block does evaluate to true
     66 def assert(msg = 'unknown cause')
     67   fail 'No assertion block provided' unless block_given?
     68   fail AssertionError, msg unless yield
     69 end
     70 
     71 # loads the certificates used to access the test server securely.
     72 def load_test_certs
     73   this_dir = File.expand_path(File.dirname(__FILE__))
     74   data_dir = File.join(File.dirname(File.dirname(this_dir)), 'spec/testdata')
     75   files = ['ca.pem', 'server1.key', 'server1.pem']
     76   files.map { |f| File.open(File.join(data_dir, f)).read }
     77 end
     78 
     79 # creates SSL Credentials from the test certificates.
     80 def test_creds
     81   certs = load_test_certs
     82   GRPC::Core::ChannelCredentials.new(certs[0])
     83 end
     84 
     85 # creates SSL Credentials from the production certificates.
     86 def prod_creds
     87   GRPC::Core::ChannelCredentials.new()
     88 end
     89 
     90 # creates the SSL Credentials.
     91 def ssl_creds(use_test_ca)
     92   return test_creds if use_test_ca
     93   prod_creds
     94 end
     95 
     96 # creates a test stub that accesses host:port securely.
     97 def create_stub(opts)
     98   address = "#{opts.host}:#{opts.port}"
     99 
    100   # Provide channel args that request compression by default
    101   # for compression interop tests
    102   if ['client_compressed_unary',
    103       'client_compressed_streaming'].include?(opts.test_case)
    104     compression_options =
    105       GRPC::Core::CompressionOptions.new(default_algorithm: :gzip)
    106     compression_channel_args = compression_options.to_channel_arg_hash
    107   else
    108     compression_channel_args = {}
    109   end
    110 
    111   if opts.secure
    112     creds = ssl_creds(opts.use_test_ca)
    113     stub_opts = {
    114       channel_args: {
    115         GRPC::Core::Channel::SSL_TARGET => opts.host_override
    116       }
    117     }
    118 
    119     # Add service account creds if specified
    120     wants_creds = %w(all compute_engine_creds service_account_creds)
    121     if wants_creds.include?(opts.test_case)
    122       unless opts.oauth_scope.nil?
    123         auth_creds = Google::Auth.get_application_default(opts.oauth_scope)
    124         call_creds = GRPC::Core::CallCredentials.new(auth_creds.updater_proc)
    125         creds = creds.compose call_creds
    126       end
    127     end
    128 
    129     if opts.test_case == 'oauth2_auth_token'
    130       auth_creds = Google::Auth.get_application_default(opts.oauth_scope)
    131       kw = auth_creds.updater_proc.call({})  # gives as an auth token
    132 
    133       # use a metadata update proc that just adds the auth token.
    134       call_creds = GRPC::Core::CallCredentials.new(proc { |md| md.merge(kw) })
    135       creds = creds.compose call_creds
    136     end
    137 
    138     if opts.test_case == 'jwt_token_creds'  # don't use a scope
    139       auth_creds = Google::Auth.get_application_default
    140       call_creds = GRPC::Core::CallCredentials.new(auth_creds.updater_proc)
    141       creds = creds.compose call_creds
    142     end
    143 
    144     GRPC.logger.info("... connecting securely to #{address}")
    145     stub_opts[:channel_args].merge!(compression_channel_args)
    146     if opts.test_case == "unimplemented_service"
    147       Grpc::Testing::UnimplementedService::Stub.new(address, creds, **stub_opts)
    148     else
    149       Grpc::Testing::TestService::Stub.new(address, creds, **stub_opts)
    150     end
    151   else
    152     GRPC.logger.info("... connecting insecurely to #{address}")
    153     if opts.test_case == "unimplemented_service"
    154       Grpc::Testing::UnimplementedService::Stub.new(
    155         address,
    156         :this_channel_is_insecure,
    157         channel_args: compression_channel_args
    158       )
    159     else
    160       Grpc::Testing::TestService::Stub.new(
    161         address,
    162         :this_channel_is_insecure,
    163         channel_args: compression_channel_args
    164       )
    165     end
    166   end
    167 end
    168 
    169 # produces a string of null chars (\0) of length l.
    170 def nulls(l)
    171   fail 'requires #{l} to be +ve' if l < 0
    172   [].pack('x' * l).force_encoding('ascii-8bit')
    173 end
    174 
    175 # a PingPongPlayer implements the ping pong bidi test.
    176 class PingPongPlayer
    177   include Grpc::Testing
    178   include Grpc::Testing::PayloadType
    179   attr_accessor :queue
    180   attr_accessor :canceller_op
    181 
    182   # reqs is the enumerator over the requests
    183   def initialize(msg_sizes)
    184     @queue = Queue.new
    185     @msg_sizes = msg_sizes
    186     @canceller_op = nil  # used to cancel after the first response
    187   end
    188 
    189   def each_item
    190     return enum_for(:each_item) unless block_given?
    191     req_cls, p_cls = StreamingOutputCallRequest, ResponseParameters  # short
    192     count = 0
    193     @msg_sizes.each do |m|
    194       req_size, resp_size = m
    195       req = req_cls.new(payload: Payload.new(body: nulls(req_size)),
    196                         response_type: :COMPRESSABLE,
    197                         response_parameters: [p_cls.new(size: resp_size)])
    198       yield req
    199       resp = @queue.pop
    200       assert('payload type is wrong') { :COMPRESSABLE == resp.payload.type }
    201       assert("payload body #{count} has the wrong length") do
    202         resp_size == resp.payload.body.length
    203       end
    204       p "OK: ping_pong #{count}"
    205       count += 1
    206       unless @canceller_op.nil?
    207         canceller_op.cancel
    208         break
    209       end
    210     end
    211   end
    212 end
    213 
    214 class BlockingEnumerator
    215   include Grpc::Testing
    216   include Grpc::Testing::PayloadType
    217 
    218   def initialize(req_size, sleep_time)
    219     @req_size = req_size
    220     @sleep_time = sleep_time
    221   end
    222 
    223   def each_item
    224     return enum_for(:each_item) unless block_given?
    225     req_cls = StreamingOutputCallRequest
    226     req = req_cls.new(payload: Payload.new(body: nulls(@req_size)))
    227     yield req
    228     # Sleep until after the deadline should have passed
    229     sleep(@sleep_time)
    230   end
    231 end
    232 
    233 # Intended to be used to wrap a call_op, and to adjust
    234 # the write flag of the call_op in between messages yielded to it.
    235 class WriteFlagSettingStreamingInputEnumerable
    236   attr_accessor :call_op
    237 
    238   def initialize(requests_and_write_flags)
    239     @requests_and_write_flags = requests_and_write_flags
    240   end
    241 
    242   def each
    243     @requests_and_write_flags.each do |request_and_flag|
    244       @call_op.write_flag = request_and_flag[:write_flag]
    245       yield request_and_flag[:request]
    246     end
    247   end
    248 end
    249 
    250 # defines methods corresponding to each interop test case.
    251 class NamedTests
    252   include Grpc::Testing
    253   include Grpc::Testing::PayloadType
    254   include GRPC::Core::MetadataKeys
    255 
    256   def initialize(stub, args)
    257     @stub = stub
    258     @args = args
    259   end
    260 
    261   def empty_unary
    262     resp = @stub.empty_call(Empty.new)
    263     assert('empty_unary: invalid response') { resp.is_a?(Empty) }
    264   end
    265 
    266   def large_unary
    267     perform_large_unary
    268   end
    269 
    270   def client_compressed_unary
    271     # first request used also for the probe
    272     req_size, wanted_response_size = 271_828, 314_159
    273     expect_compressed = BoolValue.new(value: true)
    274     payload = Payload.new(type: :COMPRESSABLE, body: nulls(req_size))
    275     req = SimpleRequest.new(response_type: :COMPRESSABLE,
    276                             response_size: wanted_response_size,
    277                             payload: payload,
    278                             expect_compressed: expect_compressed)
    279 
    280     # send a probe to see if CompressedResponse is supported on the server
    281     send_probe_for_compressed_request_support do
    282       request_uncompressed_args = {
    283         COMPRESSION_REQUEST_ALGORITHM => 'identity'
    284       }
    285       @stub.unary_call(req, metadata: request_uncompressed_args)
    286     end
    287 
    288     # make a call with a compressed message
    289     resp = @stub.unary_call(req)
    290     assert('Expected second unary call with compression to work') do
    291       resp.payload.body.length == wanted_response_size
    292     end
    293 
    294     # make a call with an uncompressed message
    295     stub_options = {
    296       COMPRESSION_REQUEST_ALGORITHM => 'identity'
    297     }
    298 
    299     req = SimpleRequest.new(
    300       response_type: :COMPRESSABLE,
    301       response_size: wanted_response_size,
    302       payload: payload,
    303       expect_compressed: BoolValue.new(value: false)
    304     )
    305 
    306     resp = @stub.unary_call(req, metadata: stub_options)
    307     assert('Expected second unary call with compression to work') do
    308       resp.payload.body.length == wanted_response_size
    309     end
    310   end
    311 
    312   def service_account_creds
    313     # ignore this test if the oauth options are not set
    314     if @args.oauth_scope.nil?
    315       p 'NOT RUN: service_account_creds; no service_account settings'
    316       return
    317     end
    318     json_key = File.read(ENV[AUTH_ENV])
    319     wanted_email = MultiJson.load(json_key)['client_email']
    320     resp = perform_large_unary(fill_username: true,
    321                                fill_oauth_scope: true)
    322     assert("#{__callee__}: bad username") { wanted_email == resp.username }
    323     assert("#{__callee__}: bad oauth scope") do
    324       @args.oauth_scope.include?(resp.oauth_scope)
    325     end
    326   end
    327 
    328   def jwt_token_creds
    329     json_key = File.read(ENV[AUTH_ENV])
    330     wanted_email = MultiJson.load(json_key)['client_email']
    331     resp = perform_large_unary(fill_username: true)
    332     assert("#{__callee__}: bad username") { wanted_email == resp.username }
    333   end
    334 
    335   def compute_engine_creds
    336     resp = perform_large_unary(fill_username: true,
    337                                fill_oauth_scope: true)
    338     assert("#{__callee__}: bad username") do
    339       @args.default_service_account == resp.username
    340     end
    341   end
    342 
    343   def oauth2_auth_token
    344     resp = perform_large_unary(fill_username: true,
    345                                fill_oauth_scope: true)
    346     json_key = File.read(ENV[AUTH_ENV])
    347     wanted_email = MultiJson.load(json_key)['client_email']
    348     assert("#{__callee__}: bad username") { wanted_email == resp.username }
    349     assert("#{__callee__}: bad oauth scope") do
    350       @args.oauth_scope.include?(resp.oauth_scope)
    351     end
    352   end
    353 
    354   def per_rpc_creds
    355     auth_creds = Google::Auth.get_application_default(@args.oauth_scope)
    356     update_metadata = proc do |md|
    357       kw = auth_creds.updater_proc.call({})
    358     end
    359 
    360     call_creds = GRPC::Core::CallCredentials.new(update_metadata)
    361 
    362     resp = perform_large_unary(fill_username: true,
    363                                fill_oauth_scope: true,
    364                                credentials: call_creds)
    365     json_key = File.read(ENV[AUTH_ENV])
    366     wanted_email = MultiJson.load(json_key)['client_email']
    367     assert("#{__callee__}: bad username") { wanted_email == resp.username }
    368     assert("#{__callee__}: bad oauth scope") do
    369       @args.oauth_scope.include?(resp.oauth_scope)
    370     end
    371   end
    372 
    373   def client_streaming
    374     msg_sizes = [27_182, 8, 1828, 45_904]
    375     wanted_aggregate_size = 74_922
    376     reqs = msg_sizes.map do |x|
    377       req = Payload.new(body: nulls(x))
    378       StreamingInputCallRequest.new(payload: req)
    379     end
    380     resp = @stub.streaming_input_call(reqs)
    381     assert("#{__callee__}: aggregate payload size is incorrect") do
    382       wanted_aggregate_size == resp.aggregated_payload_size
    383     end
    384   end
    385 
    386   def client_compressed_streaming
    387     # first request used also by the probe
    388     first_request = StreamingInputCallRequest.new(
    389       payload: Payload.new(type: :COMPRESSABLE, body: nulls(27_182)),
    390       expect_compressed: BoolValue.new(value: true)
    391     )
    392 
    393     # send a probe to see if CompressedResponse is supported on the server
    394     send_probe_for_compressed_request_support do
    395       request_uncompressed_args = {
    396         COMPRESSION_REQUEST_ALGORITHM => 'identity'
    397       }
    398       @stub.streaming_input_call([first_request],
    399                                  metadata: request_uncompressed_args)
    400     end
    401 
    402     second_request = StreamingInputCallRequest.new(
    403       payload: Payload.new(type: :COMPRESSABLE, body: nulls(45_904)),
    404       expect_compressed: BoolValue.new(value: false)
    405     )
    406 
    407     # Create the requests messages and the corresponding write flags
    408     # for each message
    409     requests = WriteFlagSettingStreamingInputEnumerable.new([
    410       { request: first_request,
    411         write_flag: 0 },
    412       { request: second_request,
    413         write_flag: GRPC::Core::WriteFlags::NO_COMPRESS }
    414     ])
    415 
    416     # Create the call_op, pass it to the requests enumerable, and
    417     # run the call
    418     call_op = @stub.streaming_input_call(requests,
    419                                          return_op: true)
    420     requests.call_op = call_op
    421     resp = call_op.execute
    422 
    423     wanted_aggregate_size = 73_086
    424 
    425     assert("#{__callee__}: aggregate payload size is incorrect") do
    426       wanted_aggregate_size == resp.aggregated_payload_size
    427     end
    428   end
    429 
    430   def server_streaming
    431     msg_sizes = [31_415, 9, 2653, 58_979]
    432     response_spec = msg_sizes.map { |s| ResponseParameters.new(size: s) }
    433     req = StreamingOutputCallRequest.new(response_type: :COMPRESSABLE,
    434                                          response_parameters: response_spec)
    435     resps = @stub.streaming_output_call(req)
    436     resps.each_with_index do |r, i|
    437       assert("#{__callee__}: too many responses") { i < msg_sizes.length }
    438       assert("#{__callee__}: payload body #{i} has the wrong length") do
    439         msg_sizes[i] == r.payload.body.length
    440       end
    441       assert("#{__callee__}: payload type is wrong") do
    442         :COMPRESSABLE == r.payload.type
    443       end
    444     end
    445   end
    446 
    447   def ping_pong
    448     msg_sizes = [[27_182, 31_415], [8, 9], [1828, 2653], [45_904, 58_979]]
    449     ppp = PingPongPlayer.new(msg_sizes)
    450     resps = @stub.full_duplex_call(ppp.each_item)
    451     resps.each { |r| ppp.queue.push(r) }
    452   end
    453 
    454   def timeout_on_sleeping_server
    455     enum = BlockingEnumerator.new(27_182, 2)
    456     deadline = GRPC::Core::TimeConsts::from_relative_time(1)
    457     resps = @stub.full_duplex_call(enum.each_item, deadline: deadline)
    458     resps.each { } # wait to receive each request (or timeout)
    459     fail 'Should have raised GRPC::DeadlineExceeded'
    460   rescue GRPC::DeadlineExceeded
    461   end
    462 
    463   def empty_stream
    464     ppp = PingPongPlayer.new([])
    465     resps = @stub.full_duplex_call(ppp.each_item)
    466     count = 0
    467     resps.each do |r|
    468       ppp.queue.push(r)
    469       count += 1
    470     end
    471     assert("#{__callee__}: too many responses expected 0") do
    472       count == 0
    473     end
    474   end
    475 
    476   def cancel_after_begin
    477     msg_sizes = [27_182, 8, 1828, 45_904]
    478     reqs = msg_sizes.map do |x|
    479       req = Payload.new(body: nulls(x))
    480       StreamingInputCallRequest.new(payload: req)
    481     end
    482     op = @stub.streaming_input_call(reqs, return_op: true)
    483     op.cancel
    484     op.execute
    485     fail 'Should have raised GRPC:Cancelled'
    486   rescue GRPC::Cancelled
    487     assert("#{__callee__}: call operation should be CANCELLED") { op.cancelled? }
    488   end
    489 
    490   def cancel_after_first_response
    491     msg_sizes = [[27_182, 31_415], [8, 9], [1828, 2653], [45_904, 58_979]]
    492     ppp = PingPongPlayer.new(msg_sizes)
    493     op = @stub.full_duplex_call(ppp.each_item, return_op: true)
    494     ppp.canceller_op = op  # causes ppp to cancel after the 1st message
    495     op.execute.each { |r| ppp.queue.push(r) }
    496     fail 'Should have raised GRPC:Cancelled'
    497   rescue GRPC::Cancelled
    498     assert("#{__callee__}: call operation should be CANCELLED") { op.cancelled? }
    499     op.wait
    500   end
    501 
    502   def unimplemented_method
    503     begin
    504       resp = @stub.unimplemented_call(Empty.new)
    505     rescue GRPC::Unimplemented => e
    506       return
    507     rescue Exception => e
    508       fail AssertionError, "Expected BadStatus. Received: #{e.inspect}"
    509     end
    510     fail AssertionError, "GRPC::Unimplemented should have been raised. Was not."
    511   end
    512 
    513   def unimplemented_service
    514     begin
    515       resp = @stub.unimplemented_call(Empty.new)
    516     rescue GRPC::Unimplemented => e
    517       return
    518     rescue Exception => e
    519       fail AssertionError, "Expected BadStatus. Received: #{e.inspect}"
    520     end
    521     fail AssertionError, "GRPC::Unimplemented should have been raised. Was not."
    522   end
    523 
    524   def status_code_and_message
    525 
    526     # Function wide constants.
    527     message = "test status method"
    528     code = GRPC::Core::StatusCodes::UNKNOWN
    529 
    530     # Testing with UnaryCall.
    531     payload = Payload.new(type: :COMPRESSABLE, body: nulls(1))
    532     echo_status = EchoStatus.new(code: code, message: message)
    533     req = SimpleRequest.new(response_type: :COMPRESSABLE,
    534 			    response_size: 1,
    535 			    payload: payload,
    536 			    response_status: echo_status)
    537     seen_correct_exception = false
    538     begin
    539       resp = @stub.unary_call(req)
    540     rescue GRPC::Unknown => e
    541       if e.details != message
    542 	      fail AssertionError,
    543 	        "Expected message #{message}. Received: #{e.details}"
    544       end
    545       seen_correct_exception = true
    546     rescue Exception => e
    547       fail AssertionError, "Expected BadStatus. Received: #{e.inspect}"
    548     end
    549 
    550     if not seen_correct_exception
    551       fail AssertionError, "Did not see expected status from UnaryCall"
    552     end
    553 
    554     # testing with FullDuplex
    555     req_cls, p_cls = StreamingOutputCallRequest, ResponseParameters
    556     duplex_req = req_cls.new(payload: Payload.new(body: nulls(1)),
    557                   response_type: :COMPRESSABLE,
    558                   response_parameters: [p_cls.new(size: 1)],
    559                   response_status: echo_status)
    560     seen_correct_exception = false
    561     begin
    562       resp = @stub.full_duplex_call([duplex_req])
    563       resp.each { |r| }
    564     rescue GRPC::Unknown => e
    565       if e.details != message
    566         fail AssertionError,
    567           "Expected message #{message}. Received: #{e.details}"
    568       end
    569       seen_correct_exception = true
    570     rescue Exception => e
    571       fail AssertionError, "Expected BadStatus. Received: #{e.inspect}"
    572     end
    573 
    574     if not seen_correct_exception
    575       fail AssertionError, "Did not see expected status from FullDuplexCall"
    576     end
    577 
    578   end
    579 
    580 
    581   def custom_metadata
    582 
    583     # Function wide constants
    584     req_size, wanted_response_size = 271_828, 314_159
    585     initial_metadata_key = "x-grpc-test-echo-initial"
    586     initial_metadata_value = "test_initial_metadata_value"
    587     trailing_metadata_key = "x-grpc-test-echo-trailing-bin"
    588     trailing_metadata_value = "\x0a\x0b\x0a\x0b\x0a\x0b"
    589 
    590     metadata = {
    591       initial_metadata_key => initial_metadata_value,
    592       trailing_metadata_key => trailing_metadata_value
    593     }
    594 
    595     # Testing with UnaryCall
    596     payload = Payload.new(type: :COMPRESSABLE, body: nulls(req_size))
    597     req = SimpleRequest.new(response_type: :COMPRESSABLE,
    598 			    response_size: wanted_response_size,
    599 			    payload: payload)
    600 
    601     op = @stub.unary_call(req, metadata: metadata, return_op: true)
    602     op.execute
    603     if not op.metadata.has_key?(initial_metadata_key)
    604       fail AssertionError, "Expected initial metadata. None received"
    605     elsif op.metadata[initial_metadata_key] != metadata[initial_metadata_key]
    606       fail AssertionError, 
    607              "Expected initial metadata: #{metadata[initial_metadata_key]}. "\
    608              "Received: #{op.metadata[initial_metadata_key]}"
    609     end
    610     if not op.trailing_metadata.has_key?(trailing_metadata_key)
    611       fail AssertionError, "Expected trailing metadata. None received"
    612     elsif op.trailing_metadata[trailing_metadata_key] !=
    613           metadata[trailing_metadata_key]
    614       fail AssertionError, 
    615             "Expected trailing metadata: #{metadata[trailing_metadata_key]}. "\
    616             "Received: #{op.trailing_metadata[trailing_metadata_key]}"
    617     end
    618 
    619     # Testing with FullDuplex
    620     req_cls, p_cls = StreamingOutputCallRequest, ResponseParameters
    621     duplex_req = req_cls.new(payload: Payload.new(body: nulls(req_size)),
    622                   response_type: :COMPRESSABLE,
    623                   response_parameters: [p_cls.new(size: wanted_response_size)])
    624 
    625     duplex_op = @stub.full_duplex_call([duplex_req], metadata: metadata,
    626                                         return_op: true)
    627     resp = duplex_op.execute
    628     resp.each { |r| } # ensures that the server sends trailing data
    629     duplex_op.wait
    630     if not duplex_op.metadata.has_key?(initial_metadata_key)
    631       fail AssertionError, "Expected initial metadata. None received"
    632     elsif duplex_op.metadata[initial_metadata_key] !=
    633           metadata[initial_metadata_key]
    634       fail AssertionError,
    635              "Expected initial metadata: #{metadata[initial_metadata_key]}. "\
    636              "Received: #{duplex_op.metadata[initial_metadata_key]}"
    637     end
    638     if not duplex_op.trailing_metadata[trailing_metadata_key]
    639       fail AssertionError, "Expected trailing metadata. None received"
    640     elsif duplex_op.trailing_metadata[trailing_metadata_key] !=
    641           metadata[trailing_metadata_key]
    642       fail AssertionError, 
    643           "Expected trailing metadata: #{metadata[trailing_metadata_key]}. "\
    644           "Received: #{duplex_op.trailing_metadata[trailing_metadata_key]}"
    645     end
    646 
    647   end
    648 
    649   def all
    650     all_methods = NamedTests.instance_methods(false).map(&:to_s)
    651     all_methods.each do |m|
    652       next if m == 'all' || m.start_with?('assert')
    653       p "TESTCASE: #{m}"
    654       method(m).call
    655     end
    656   end
    657 
    658   private
    659 
    660   def perform_large_unary(fill_username: false, fill_oauth_scope: false, **kw)
    661     req_size, wanted_response_size = 271_828, 314_159
    662     payload = Payload.new(type: :COMPRESSABLE, body: nulls(req_size))
    663     req = SimpleRequest.new(response_type: :COMPRESSABLE,
    664                             response_size: wanted_response_size,
    665                             payload: payload)
    666     req.fill_username = fill_username
    667     req.fill_oauth_scope = fill_oauth_scope
    668     resp = @stub.unary_call(req, **kw)
    669     assert('payload type is wrong') do
    670       :COMPRESSABLE == resp.payload.type
    671     end
    672     assert('payload body has the wrong length') do
    673       wanted_response_size == resp.payload.body.length
    674     end
    675     assert('payload body is invalid') do
    676       nulls(wanted_response_size) == resp.payload.body
    677     end
    678     resp
    679   end
    680 
    681   # Send probing message for compressed request on the server, to see
    682   # if it's implemented.
    683   def send_probe_for_compressed_request_support(&send_probe)
    684     bad_status_occurred = false
    685 
    686     begin
    687       send_probe.call
    688     rescue GRPC::BadStatus => e
    689       if e.code == GRPC::Core::StatusCodes::INVALID_ARGUMENT
    690         bad_status_occurred = true
    691       else
    692         fail AssertionError, "Bad status received but code is #{e.code}"
    693       end
    694     rescue Exception => e
    695       fail AssertionError, "Expected BadStatus. Received: #{e.inspect}"
    696     end
    697 
    698     assert('CompressedRequest probe failed') do
    699       bad_status_occurred
    700     end
    701   end
    702 
    703 end
    704 
    705 # Args is used to hold the command line info.
    706 Args = Struct.new(:default_service_account, :host, :host_override,
    707                   :oauth_scope, :port, :secure, :test_case,
    708                   :use_test_ca)
    709 
    710 # validates the command line options, returning them as a Hash.
    711 def parse_args
    712   args = Args.new
    713   args.host_override = 'foo.test.google.fr'
    714   OptionParser.new do |opts|
    715     opts.on('--oauth_scope scope',
    716             'Scope for OAuth tokens') { |v| args['oauth_scope'] = v }
    717     opts.on('--server_host SERVER_HOST', 'server hostname') do |v|
    718       args['host'] = v
    719     end
    720     opts.on('--default_service_account email_address',
    721             'email address of the default service account') do |v|
    722       args['default_service_account'] = v
    723     end
    724     opts.on('--server_host_override HOST_OVERRIDE',
    725             'override host via a HTTP header') do |v|
    726       args['host_override'] = v
    727     end
    728     opts.on('--server_port SERVER_PORT', 'server port') { |v| args['port'] = v }
    729     # instance_methods(false) gives only the methods defined in that class
    730     test_cases = NamedTests.instance_methods(false).map(&:to_s)
    731     test_case_list = test_cases.join(',')
    732     opts.on('--test_case CODE', test_cases, {}, 'select a test_case',
    733             "  (#{test_case_list})") { |v| args['test_case'] = v }
    734     opts.on('--use_tls USE_TLS', ['false', 'true'],
    735             'require a secure connection?') do |v|
    736       args['secure'] = v == 'true'
    737     end
    738     opts.on('--use_test_ca USE_TEST_CA', ['false', 'true'],
    739             'if secure, use the test certificate?') do |v|
    740       args['use_test_ca'] = v == 'true'
    741     end
    742   end.parse!
    743   _check_args(args)
    744 end
    745 
    746 def _check_args(args)
    747   %w(host port test_case).each do |a|
    748     if args[a].nil?
    749       fail(OptionParser::MissingArgument, "please specify --#{a}")
    750     end
    751   end
    752   args
    753 end
    754 
    755 def main
    756   opts = parse_args
    757   stub = create_stub(opts)
    758   NamedTests.new(stub, opts).method(opts['test_case']).call
    759   p "OK: #{opts['test_case']}"
    760 end
    761 
    762 if __FILE__ == $0
    763   main
    764 end
    765