Home | History | Annotate | Download | only in generic
      1 # Copyright 2015 gRPC authors.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #     http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing, software
     10 # distributed under the License is distributed on an "AS IS" BASIS,
     11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 
     15 require 'spec_helper'
     16 
     17 Thread.abort_on_exception = true
     18 
     19 def wakey_thread(&blk)
     20   n = GRPC::Notifier.new
     21   t = Thread.new do
     22     blk.call(n)
     23   end
     24   t.abort_on_exception = true
     25   n.wait
     26   t
     27 end
     28 
     29 def load_test_certs
     30   test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata')
     31   files = ['ca.pem', 'server1.key', 'server1.pem']
     32   files.map { |f| File.open(File.join(test_root, f)).read }
     33 end
     34 
     35 include GRPC::Core::StatusCodes
     36 include GRPC::Core::TimeConsts
     37 include GRPC::Core::CallOps
     38 
     39 # check that methods on a finished/closed call t crash
     40 def check_op_view_of_finished_client_call(op_view,
     41                                           expected_metadata,
     42                                           expected_trailing_metadata)
     43   # use read_response_stream to try to iterate through
     44   # possible response stream
     45   fail('need something to attempt reads') unless block_given?
     46   expect do
     47     resp = op_view.execute
     48     yield resp
     49   end.to raise_error(GRPC::Core::CallError)
     50 
     51   expect { op_view.start_call }.to raise_error(RuntimeError)
     52 
     53   sanity_check_values_of_accessors(op_view,
     54                                    expected_metadata,
     55                                    expected_trailing_metadata)
     56 
     57   expect do
     58     op_view.wait
     59     op_view.cancel
     60     op_view.write_flag = 1
     61   end.to_not raise_error
     62 end
     63 
     64 def sanity_check_values_of_accessors(op_view,
     65                                      expected_metadata,
     66                                      expected_trailing_metadata)
     67   expected_status = Struct::Status.new
     68   expected_status.code = 0
     69   expected_status.details = 'OK'
     70   expected_status.metadata = expected_trailing_metadata
     71 
     72   expect(op_view.status).to eq(expected_status)
     73   expect(op_view.metadata).to eq(expected_metadata)
     74   expect(op_view.trailing_metadata).to eq(expected_trailing_metadata)
     75 
     76   expect(op_view.cancelled?).to be(false)
     77   expect(op_view.write_flag).to be(nil)
     78 
     79   # The deadline attribute of a call can be either
     80   # a GRPC::Core::TimeSpec or a Time, which are mutually exclusive.
     81   # TODO: fix so that the accessor always returns the same type.
     82   expect(op_view.deadline.is_a?(GRPC::Core::TimeSpec) ||
     83          op_view.deadline.is_a?(Time)).to be(true)
     84 end
     85 
     86 def close_active_server_call(active_server_call)
     87   active_server_call.send(:set_input_stream_done)
     88   active_server_call.send(:set_output_stream_done)
     89 end
     90 
     91 describe 'ClientStub' do  # rubocop:disable Metrics/BlockLength
     92   let(:noop) { proc { |x| x } }
     93 
     94   before(:each) do
     95     Thread.abort_on_exception = true
     96     @server = nil
     97     @method = 'an_rpc_method'
     98     @pass = OK
     99     @fail = INTERNAL
    100     @metadata = { k1: 'v1', k2: 'v2' }
    101   end
    102 
    103   after(:each) do
    104     unless @server.nil?
    105       @server.shutdown_and_notify(from_relative_time(2))
    106       @server.close
    107     end
    108   end
    109 
    110   describe '#new' do
    111     let(:fake_host) { 'localhost:0' }
    112     it 'can be created from a host and args' do
    113       opts = { channel_args: { a_channel_arg: 'an_arg' } }
    114       blk = proc do
    115         GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
    116       end
    117       expect(&blk).not_to raise_error
    118     end
    119 
    120     it 'can be created with an channel override' do
    121       opts = {
    122         channel_args: { a_channel_arg: 'an_arg' },
    123         channel_override: @ch
    124       }
    125       blk = proc do
    126         GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
    127       end
    128       expect(&blk).not_to raise_error
    129     end
    130 
    131     it 'cannot be created with a bad channel override' do
    132       blk = proc do
    133         opts = {
    134           channel_args: { a_channel_arg: 'an_arg' },
    135           channel_override: Object.new
    136         }
    137         GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
    138       end
    139       expect(&blk).to raise_error
    140     end
    141 
    142     it 'cannot be created with bad credentials' do
    143       blk = proc do
    144         opts = { channel_args: { a_channel_arg: 'an_arg' } }
    145         GRPC::ClientStub.new(fake_host, Object.new, **opts)
    146       end
    147       expect(&blk).to raise_error
    148     end
    149 
    150     it 'can be created with test test credentials' do
    151       certs = load_test_certs
    152       blk = proc do
    153         opts = {
    154           channel_args: {
    155             GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr',
    156             a_channel_arg: 'an_arg'
    157           }
    158         }
    159         creds = GRPC::Core::ChannelCredentials.new(certs[0], nil, nil)
    160         GRPC::ClientStub.new(fake_host, creds,  **opts)
    161       end
    162       expect(&blk).to_not raise_error
    163     end
    164   end
    165 
    166   describe '#request_response', request_response: true do
    167     before(:each) do
    168       @sent_msg, @resp = 'a_msg', 'a_reply'
    169     end
    170 
    171     shared_examples 'request response' do
    172       it 'should send a request to/receive a reply from a server' do
    173         server_port = create_test_server
    174         th = run_request_response(@sent_msg, @resp, @pass)
    175         stub = GRPC::ClientStub.new("localhost:#{server_port}",
    176                                     :this_channel_is_insecure)
    177         expect(get_response(stub)).to eq(@resp)
    178         th.join
    179       end
    180 
    181       def metadata_test(md)
    182         server_port = create_test_server
    183         host = "localhost:#{server_port}"
    184         th = run_request_response(@sent_msg, @resp, @pass,
    185                                   expected_metadata: md)
    186         stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
    187         @metadata = md
    188         expect(get_response(stub)).to eq(@resp)
    189         th.join
    190       end
    191 
    192       it 'should send metadata to the server ok' do
    193         metadata_test(k1: 'v1', k2: 'v2')
    194       end
    195 
    196       # these tests mostly try to exercise when md might be allocated
    197       # instead of inlined
    198       it 'should send metadata with multiple large md to the server ok' do
    199         val_array = %w(
    200           '00000000000000000000000000000000000000000000000000000000000000',
    201           '11111111111111111111111111111111111111111111111111111111111111',
    202           '22222222222222222222222222222222222222222222222222222222222222',
    203         )
    204         md = {
    205           k1: val_array,
    206           k2: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa',
    207           k3: 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb',
    208           k4: 'cccccccccccccccccccccccccccccccccccccccccccccccccccccccccc',
    209           keeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeey5: 'v5',
    210           'k66666666666666666666666666666666666666666666666666666' => 'v6',
    211           'k77777777777777777777777777777777777777777777777777777' => 'v7',
    212           'k88888888888888888888888888888888888888888888888888888' => 'v8'
    213         }
    214         metadata_test(md)
    215       end
    216 
    217       it 'should send a request when configured using an override channel' do
    218         server_port = create_test_server
    219         alt_host = "localhost:#{server_port}"
    220         th = run_request_response(@sent_msg, @resp, @pass)
    221         ch = GRPC::Core::Channel.new(alt_host, nil, :this_channel_is_insecure)
    222         stub = GRPC::ClientStub.new('ignored-host',
    223                                     :this_channel_is_insecure,
    224                                     channel_override: ch)
    225         expect(get_response(stub)).to eq(@resp)
    226         th.join
    227       end
    228 
    229       it 'should raise an error if the status is not OK' do
    230         server_port = create_test_server
    231         host = "localhost:#{server_port}"
    232         th = run_request_response(@sent_msg, @resp, @fail)
    233         stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
    234         blk = proc { get_response(stub) }
    235         expect(&blk).to raise_error(GRPC::BadStatus)
    236         th.join
    237       end
    238 
    239       it 'should receive UNAVAILABLE if call credentials plugin fails' do
    240         server_port = create_secure_test_server
    241         server_started_notifier = GRPC::Notifier.new
    242         th = Thread.new do
    243           @server.start
    244           server_started_notifier.notify(nil)
    245           # Poll on the server so that the client connection can proceed.
    246           # We don't expect the server to actually accept a call though.
    247           expect { @server.request_call }.to raise_error(GRPC::Core::CallError)
    248         end
    249         server_started_notifier.wait
    250 
    251         certs = load_test_certs
    252         secure_channel_creds = GRPC::Core::ChannelCredentials.new(
    253           certs[0], nil, nil)
    254         secure_stub_opts = {
    255           channel_args: {
    256             GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr'
    257           }
    258         }
    259         stub = GRPC::ClientStub.new("localhost:#{server_port}",
    260                                     secure_channel_creds, **secure_stub_opts)
    261 
    262         error_message = 'Failing call credentials callback'
    263         failing_auth = proc do
    264           fail error_message
    265         end
    266         creds = GRPC::Core::CallCredentials.new(failing_auth)
    267 
    268         unavailable_error_occurred = false
    269         begin
    270           get_response(stub, credentials: creds)
    271         rescue GRPC::Unavailable => e
    272           unavailable_error_occurred = true
    273           expect(e.details.include?(error_message)).to be true
    274         end
    275         expect(unavailable_error_occurred).to eq(true)
    276 
    277         @server.shutdown_and_notify(Time.now + 3)
    278         th.join
    279         @server.close
    280       end
    281 
    282       it 'should raise ArgumentError if metadata contains invalid values' do
    283         @metadata.merge!(k3: 3)
    284         server_port = create_test_server
    285         host = "localhost:#{server_port}"
    286         stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
    287         expect do
    288           get_response(stub)
    289         end.to raise_error(ArgumentError,
    290                            /Header values must be of type string or array/)
    291       end
    292     end
    293 
    294     describe 'without a call operation' do
    295       def get_response(stub, credentials: nil)
    296         GRPC.logger.info(credentials.inspect)
    297         stub.request_response(@method, @sent_msg, noop, noop,
    298                               metadata: @metadata,
    299                               credentials: credentials)
    300       end
    301 
    302       it_behaves_like 'request response'
    303     end
    304 
    305     describe 'via a call operation' do
    306       after(:each) do
    307         # make sure op.wait doesn't hang, even if there's a bad status
    308         @op.wait
    309       end
    310       def get_response(stub, run_start_call_first: false, credentials: nil)
    311         @op = stub.request_response(@method, @sent_msg, noop, noop,
    312                                     return_op: true,
    313                                     metadata: @metadata,
    314                                     deadline: from_relative_time(2),
    315                                     credentials: credentials)
    316         expect(@op).to be_a(GRPC::ActiveCall::Operation)
    317         @op.start_call if run_start_call_first
    318         result = @op.execute
    319         result
    320       end
    321 
    322       it_behaves_like 'request response'
    323 
    324       def run_op_view_metadata_test(run_start_call_first)
    325         server_port = create_test_server
    326         host = "localhost:#{server_port}"
    327 
    328         @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
    329         @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
    330         th = run_request_response(
    331           @sent_msg, @resp, @pass,
    332           expected_metadata: @metadata,
    333           server_initial_md: @server_initial_md,
    334           server_trailing_md: @server_trailing_md)
    335         stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
    336         expect(
    337           get_response(stub,
    338                        run_start_call_first: run_start_call_first)).to eq(@resp)
    339         th.join
    340       end
    341 
    342       it 'sends metadata to the server ok when running start_call first' do
    343         run_op_view_metadata_test(true)
    344         check_op_view_of_finished_client_call(
    345           @op, @server_initial_md, @server_trailing_md
    346         ) { |r| GRPC.logger.info(r) }
    347       end
    348 
    349       it 'does not crash when used after the call has been finished' do
    350         run_op_view_metadata_test(false)
    351         check_op_view_of_finished_client_call(
    352           @op, @server_initial_md, @server_trailing_md
    353         ) { |r| GRPC.logger.info(r) }
    354       end
    355     end
    356   end
    357 
    358   describe '#client_streamer', client_streamer: true do
    359     before(:each) do
    360       Thread.abort_on_exception = true
    361       server_port = create_test_server
    362       host = "localhost:#{server_port}"
    363       @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
    364       @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
    365       @resp = 'a_reply'
    366     end
    367 
    368     shared_examples 'client streaming' do
    369       it 'should send requests to/receive a reply from a server' do
    370         th = run_client_streamer(@sent_msgs, @resp, @pass)
    371         expect(get_response(@stub)).to eq(@resp)
    372         th.join
    373       end
    374 
    375       it 'should send metadata to the server ok' do
    376         th = run_client_streamer(@sent_msgs, @resp, @pass,
    377                                  expected_metadata: @metadata)
    378         expect(get_response(@stub)).to eq(@resp)
    379         th.join
    380       end
    381 
    382       it 'should raise an error if the status is not ok' do
    383         th = run_client_streamer(@sent_msgs, @resp, @fail)
    384         blk = proc { get_response(@stub) }
    385         expect(&blk).to raise_error(GRPC::BadStatus)
    386         th.join
    387       end
    388 
    389       it 'should raise ArgumentError if metadata contains invalid values' do
    390         @metadata.merge!(k3: 3)
    391         expect do
    392           get_response(@stub)
    393         end.to raise_error(ArgumentError,
    394                            /Header values must be of type string or array/)
    395       end
    396     end
    397 
    398     describe 'without a call operation' do
    399       def get_response(stub)
    400         stub.client_streamer(@method, @sent_msgs, noop, noop,
    401                              metadata: @metadata)
    402       end
    403 
    404       it_behaves_like 'client streaming'
    405     end
    406 
    407     describe 'via a call operation' do
    408       after(:each) do
    409         # make sure op.wait doesn't hang, even if there's a bad status
    410         @op.wait
    411       end
    412       def get_response(stub, run_start_call_first: false)
    413         @op = stub.client_streamer(@method, @sent_msgs, noop, noop,
    414                                    return_op: true, metadata: @metadata)
    415         expect(@op).to be_a(GRPC::ActiveCall::Operation)
    416         @op.start_call if run_start_call_first
    417         result = @op.execute
    418         result
    419       end
    420 
    421       it_behaves_like 'client streaming'
    422 
    423       def run_op_view_metadata_test(run_start_call_first)
    424         @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
    425         @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
    426         th = run_client_streamer(
    427           @sent_msgs, @resp, @pass,
    428           expected_metadata: @metadata,
    429           server_initial_md: @server_initial_md,
    430           server_trailing_md: @server_trailing_md)
    431         expect(
    432           get_response(@stub,
    433                        run_start_call_first: run_start_call_first)).to eq(@resp)
    434         th.join
    435       end
    436 
    437       it 'sends metadata to the server ok when running start_call first' do
    438         run_op_view_metadata_test(true)
    439         check_op_view_of_finished_client_call(
    440           @op, @server_initial_md, @server_trailing_md
    441         ) { |r| GRPC.logger.info(r) }
    442       end
    443 
    444       it 'does not crash when used after the call has been finished' do
    445         run_op_view_metadata_test(false)
    446         check_op_view_of_finished_client_call(
    447           @op, @server_initial_md, @server_trailing_md
    448         ) { |r| GRPC.logger.info(r) }
    449       end
    450     end
    451   end
    452 
    453   describe '#server_streamer', server_streamer: true do
    454     before(:each) do
    455       @sent_msg = 'a_msg'
    456       @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
    457     end
    458 
    459     shared_examples 'server streaming' do
    460       it 'should send a request to/receive replies from a server' do
    461         server_port = create_test_server
    462         host = "localhost:#{server_port}"
    463         th = run_server_streamer(@sent_msg, @replys, @pass)
    464         stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
    465         expect(get_responses(stub).collect { |r| r }).to eq(@replys)
    466         th.join
    467       end
    468 
    469       it 'should raise an error if the status is not ok' do
    470         server_port = create_test_server
    471         host = "localhost:#{server_port}"
    472         th = run_server_streamer(@sent_msg, @replys, @fail)
    473         stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
    474         e = get_responses(stub)
    475         expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
    476         th.join
    477       end
    478 
    479       it 'should send metadata to the server ok' do
    480         server_port = create_test_server
    481         host = "localhost:#{server_port}"
    482         th = run_server_streamer(@sent_msg, @replys, @fail,
    483                                  expected_metadata: { k1: 'v1', k2: 'v2' })
    484         stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
    485         e = get_responses(stub)
    486         expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
    487         th.join
    488       end
    489 
    490       it 'should raise ArgumentError if metadata contains invalid values' do
    491         @metadata.merge!(k3: 3)
    492         server_port = create_test_server
    493         host = "localhost:#{server_port}"
    494         stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
    495         expect do
    496           get_responses(stub).collect { |r| r }
    497         end.to raise_error(ArgumentError,
    498                            /Header values must be of type string or array/)
    499       end
    500 
    501       def run_server_streamer_against_client_with_unmarshal_error(
    502         expected_input, replys)
    503         wakey_thread do |notifier|
    504           c = expect_server_to_be_invoked(notifier)
    505           expect(c.remote_read).to eq(expected_input)
    506           begin
    507             replys.each { |r| c.remote_send(r) }
    508           rescue GRPC::Core::CallError
    509             # An attempt to write to the client might fail. This is ok
    510             # because the client call is expected to fail when
    511             # unmarshalling the first response, and to cancel the call,
    512             # and there is a race as for when the server-side call will
    513             # start to fail.
    514             p 'remote_send failed (allowed because call expected to cancel)'
    515           ensure
    516             c.send_status(OK, 'OK', true)
    517             close_active_server_call(c)
    518           end
    519         end
    520       end
    521 
    522       it 'the call terminates when there is an unmarshalling error' do
    523         server_port = create_test_server
    524         host = "localhost:#{server_port}"
    525         th = run_server_streamer_against_client_with_unmarshal_error(
    526           @sent_msg, @replys)
    527         stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
    528 
    529         unmarshal = proc { fail(ArgumentError, 'test unmarshalling error') }
    530         expect do
    531           get_responses(stub, unmarshal: unmarshal).collect { |r| r }
    532         end.to raise_error(ArgumentError, 'test unmarshalling error')
    533         th.join
    534       end
    535     end
    536 
    537     describe 'without a call operation' do
    538       def get_responses(stub, unmarshal: noop)
    539         e = stub.server_streamer(@method, @sent_msg, noop, unmarshal,
    540                                  metadata: @metadata)
    541         expect(e).to be_a(Enumerator)
    542         e
    543       end
    544 
    545       it_behaves_like 'server streaming'
    546     end
    547 
    548     describe 'via a call operation' do
    549       after(:each) do
    550         @op.wait # make sure wait doesn't hang
    551       end
    552       def get_responses(stub, run_start_call_first: false, unmarshal: noop)
    553         @op = stub.server_streamer(@method, @sent_msg, noop, unmarshal,
    554                                    return_op: true,
    555                                    metadata: @metadata)
    556         expect(@op).to be_a(GRPC::ActiveCall::Operation)
    557         @op.start_call if run_start_call_first
    558         e = @op.execute
    559         expect(e).to be_a(Enumerator)
    560         e
    561       end
    562 
    563       it_behaves_like 'server streaming'
    564 
    565       def run_op_view_metadata_test(run_start_call_first)
    566         server_port = create_test_server
    567         host = "localhost:#{server_port}"
    568         @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
    569         @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
    570         th = run_server_streamer(
    571           @sent_msg, @replys, @pass,
    572           expected_metadata: @metadata,
    573           server_initial_md: @server_initial_md,
    574           server_trailing_md: @server_trailing_md)
    575         stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
    576         e = get_responses(stub, run_start_call_first: run_start_call_first)
    577         expect(e.collect { |r| r }).to eq(@replys)
    578         th.join
    579       end
    580 
    581       it 'should send metadata to the server ok when start_call is run first' do
    582         run_op_view_metadata_test(true)
    583         check_op_view_of_finished_client_call(
    584           @op, @server_initial_md, @server_trailing_md) do |responses|
    585           responses.each { |r| GRPC.logger.info(r) }
    586         end
    587       end
    588 
    589       it 'does not crash when used after the call has been finished' do
    590         run_op_view_metadata_test(false)
    591         check_op_view_of_finished_client_call(
    592           @op, @server_initial_md, @server_trailing_md) do |responses|
    593           responses.each { |r| GRPC.logger.info(r) }
    594         end
    595       end
    596 
    597       it 'raises GRPC::Cancelled after the call has been cancelled' do
    598         server_port = create_test_server
    599         host = "localhost:#{server_port}"
    600         th = run_server_streamer(@sent_msg, @replys, @pass)
    601         stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
    602         resp = get_responses(stub, run_start_call_first: false)
    603         expect(resp.next).to eq('reply_1')
    604         @op.cancel
    605         expect { resp.next }.to raise_error(GRPC::Cancelled)
    606         th.join
    607       end
    608     end
    609   end
    610 
    611   describe '#bidi_streamer', bidi: true do
    612     before(:each) do
    613       @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
    614       @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
    615       server_port = create_test_server
    616       @host = "localhost:#{server_port}"
    617     end
    618 
    619     shared_examples 'bidi streaming' do
    620       it 'supports sending all the requests first' do
    621         th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
    622                                                    @pass)
    623         stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
    624         e = get_responses(stub)
    625         expect(e.collect { |r| r }).to eq(@replys)
    626         th.join
    627       end
    628 
    629       it 'supports client-initiated ping pong' do
    630         th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true)
    631         stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
    632         e = get_responses(stub)
    633         expect(e.collect { |r| r }).to eq(@sent_msgs)
    634         th.join
    635       end
    636 
    637       it 'supports a server-initiated ping pong' do
    638         th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false)
    639         stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
    640         e = get_responses(stub)
    641         expect(e.collect { |r| r }).to eq(@sent_msgs)
    642         th.join
    643       end
    644 
    645       it 'should raise an error if the status is not ok' do
    646         th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @fail, false)
    647         stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
    648         e = get_responses(stub)
    649         expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
    650         th.join
    651       end
    652 
    653       it 'should raise ArgumentError if metadata contains invalid values' do
    654         @metadata.merge!(k3: 3)
    655         stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
    656         expect do
    657           get_responses(stub).collect { |r| r }
    658         end.to raise_error(ArgumentError,
    659                            /Header values must be of type string or array/)
    660       end
    661 
    662       it 'terminates if the call fails to start' do
    663         # don't start the server
    664         stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
    665         expect do
    666           get_responses(stub, deadline: from_relative_time(0)).collect { |r| r }
    667         end.to raise_error(GRPC::BadStatus)
    668       end
    669 
    670       it 'should send metadata to the server ok' do
    671         th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true,
    672                                               expected_metadata: @metadata)
    673         stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
    674         e = get_responses(stub)
    675         expect(e.collect { |r| r }).to eq(@sent_msgs)
    676         th.join
    677       end
    678 
    679       # Prompted by grpc/github #10526
    680       describe 'surfacing of errors when sending requests' do
    681         def run_server_bidi_send_one_then_read_indefinitely
    682           @server.start
    683           recvd_rpc = @server.request_call
    684           recvd_call = recvd_rpc.call
    685           server_call = GRPC::ActiveCall.new(
    686             recvd_call, noop, noop, INFINITE_FUTURE,
    687             metadata_received: true, started: false)
    688           server_call.send_initial_metadata
    689           server_call.remote_send('server response')
    690           loop do
    691             m = server_call.remote_read
    692             break if m.nil?
    693           end
    694           # can't fail since initial metadata already sent
    695           server_call.send_status(@pass, 'OK', true)
    696           close_active_server_call(server_call)
    697         end
    698 
    699         def verify_error_from_write_thread(stub, requests_to_push,
    700                                            request_queue, expected_description)
    701           # TODO: an improvement might be to raise the original exception from
    702           # bidi call write loops instead of only cancelling the call
    703           failing_marshal_proc = proc do |req|
    704             fail req if req.is_a?(StandardError)
    705             req
    706           end
    707           begin
    708             e = get_responses(stub, marshal_proc: failing_marshal_proc)
    709             first_response = e.next
    710             expect(first_response).to eq('server response')
    711             requests_to_push.each { |req| request_queue.push(req) }
    712             e.collect { |r| r }
    713           rescue GRPC::Unknown => e
    714             exception = e
    715           end
    716           expect(exception.message.include?(expected_description)).to be(true)
    717         end
    718 
    719         # Provides an Enumerable view of a Queue
    720         class BidiErrorTestingEnumerateForeverQueue
    721           def initialize(queue)
    722             @queue = queue
    723           end
    724 
    725           def each
    726             loop do
    727               msg = @queue.pop
    728               yield msg
    729             end
    730           end
    731         end
    732 
    733         def run_error_in_client_request_stream_test(requests_to_push,
    734                                                     expected_error_message)
    735           # start a server that waits on a read indefinitely - it should
    736           # see a cancellation and be able to break out
    737           th = Thread.new { run_server_bidi_send_one_then_read_indefinitely }
    738           stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
    739 
    740           request_queue = Queue.new
    741           @sent_msgs = BidiErrorTestingEnumerateForeverQueue.new(request_queue)
    742 
    743           verify_error_from_write_thread(stub,
    744                                          requests_to_push,
    745                                          request_queue,
    746                                          expected_error_message)
    747           # the write loop errror should cancel the call and end the
    748           # server's request stream
    749           th.join
    750         end
    751 
    752         it 'non-GRPC errors from the write loop surface when raised ' \
    753           'at the start of a request stream' do
    754           expected_error_message = 'expect error on first request'
    755           requests_to_push = [StandardError.new(expected_error_message)]
    756           run_error_in_client_request_stream_test(requests_to_push,
    757                                                   expected_error_message)
    758         end
    759 
    760         it 'non-GRPC errors from the write loop surface when raised ' \
    761           'during the middle of a request stream' do
    762           expected_error_message = 'expect error on last request'
    763           requests_to_push = %w( one two )
    764           requests_to_push << StandardError.new(expected_error_message)
    765           run_error_in_client_request_stream_test(requests_to_push,
    766                                                   expected_error_message)
    767         end
    768       end
    769 
    770       # Prompted by grpc/github #14853
    771       describe 'client-side error handling on bidi streams' do
    772         class EnumeratorQueue
    773           def initialize(queue)
    774             @queue = queue
    775           end
    776 
    777           def each
    778             loop do
    779               msg = @queue.pop
    780               break if msg.nil?
    781               yield msg
    782             end
    783           end
    784         end
    785 
    786         def run_server_bidi_shutdown_after_one_read
    787           @server.start
    788           recvd_rpc = @server.request_call
    789           recvd_call = recvd_rpc.call
    790           server_call = GRPC::ActiveCall.new(
    791             recvd_call, noop, noop, INFINITE_FUTURE,
    792             metadata_received: true, started: false)
    793           expect(server_call.remote_read).to eq('first message')
    794           @server.shutdown_and_notify(from_relative_time(0))
    795           @server.close
    796         end
    797 
    798         it 'receives a grpc status code when writes to a bidi stream fail' do
    799           # This test tries to trigger the case when a 'SEND_MESSAGE' op
    800           # and subseqeunt 'SEND_CLOSE_FROM_CLIENT' op of a bidi stream fails.
    801           # In this case, iteration through the response stream should result
    802           # in a grpc status code, and the writer thread should not raise an
    803           # exception.
    804           server_thread = Thread.new do
    805             run_server_bidi_shutdown_after_one_read
    806           end
    807           stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
    808           request_queue = Queue.new
    809           @sent_msgs = EnumeratorQueue.new(request_queue)
    810           responses = get_responses(stub)
    811           request_queue.push('first message')
    812           # Now wait for the server to shut down.
    813           server_thread.join
    814           # Sanity check. This test is not interesting if
    815           # Thread.abort_on_exception is not set.
    816           expect(Thread.abort_on_exception).to be(true)
    817           # An attempt to send a second message should fail now that the
    818           # server is down.
    819           request_queue.push('second message')
    820           request_queue.push(nil)
    821           expect { responses.next }.to raise_error(GRPC::BadStatus)
    822         end
    823 
    824         def run_server_bidi_shutdown_after_one_write
    825           @server.start
    826           recvd_rpc = @server.request_call
    827           recvd_call = recvd_rpc.call
    828           server_call = GRPC::ActiveCall.new(
    829             recvd_call, noop, noop, INFINITE_FUTURE,
    830             metadata_received: true, started: false)
    831           server_call.send_initial_metadata
    832           server_call.remote_send('message')
    833           @server.shutdown_and_notify(from_relative_time(0))
    834           @server.close
    835         end
    836 
    837         it 'receives a grpc status code when reading from a failed bidi call' do
    838           server_thread = Thread.new do
    839             run_server_bidi_shutdown_after_one_write
    840           end
    841           stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
    842           request_queue = Queue.new
    843           @sent_msgs = EnumeratorQueue.new(request_queue)
    844           responses = get_responses(stub)
    845           expect(responses.next).to eq('message')
    846           # Wait for the server to shut down
    847           server_thread.join
    848           expect { responses.next }.to raise_error(GRPC::BadStatus)
    849           # Push a sentinel to allow the writer thread to finish
    850           request_queue.push(nil)
    851         end
    852       end
    853     end
    854 
    855     describe 'without a call operation' do
    856       def get_responses(stub, deadline: nil, marshal_proc: noop)
    857         e = stub.bidi_streamer(@method, @sent_msgs, marshal_proc, noop,
    858                                metadata: @metadata, deadline: deadline)
    859         expect(e).to be_a(Enumerator)
    860         e
    861       end
    862 
    863       it_behaves_like 'bidi streaming'
    864     end
    865 
    866     describe 'via a call operation' do
    867       after(:each) do
    868         @op.wait # make sure wait doesn't hang
    869       end
    870       def get_responses(stub, run_start_call_first: false, deadline: nil,
    871                         marshal_proc: noop)
    872         @op = stub.bidi_streamer(@method, @sent_msgs, marshal_proc, noop,
    873                                  return_op: true,
    874                                  metadata: @metadata, deadline: deadline)
    875         expect(@op).to be_a(GRPC::ActiveCall::Operation)
    876         @op.start_call if run_start_call_first
    877         e = @op.execute
    878         expect(e).to be_a(Enumerator)
    879         e
    880       end
    881 
    882       it_behaves_like 'bidi streaming'
    883 
    884       def run_op_view_metadata_test(run_start_call_first)
    885         @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
    886         @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
    887         th = run_bidi_streamer_echo_ping_pong(
    888           @sent_msgs, @pass, true,
    889           expected_metadata: @metadata,
    890           server_initial_md: @server_initial_md,
    891           server_trailing_md: @server_trailing_md)
    892         stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
    893         e = get_responses(stub, run_start_call_first: run_start_call_first)
    894         expect(e.collect { |r| r }).to eq(@sent_msgs)
    895         th.join
    896       end
    897 
    898       it 'can run start_call before executing the call' do
    899         run_op_view_metadata_test(true)
    900         check_op_view_of_finished_client_call(
    901           @op, @server_initial_md, @server_trailing_md) do |responses|
    902           responses.each { |r| GRPC.logger.info(r) }
    903         end
    904       end
    905 
    906       it 'doesnt crash when op_view used after call has finished' do
    907         run_op_view_metadata_test(false)
    908         check_op_view_of_finished_client_call(
    909           @op, @server_initial_md, @server_trailing_md) do |responses|
    910           responses.each { |r| GRPC.logger.info(r) }
    911         end
    912       end
    913 
    914       def run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback)
    915         @server.start
    916         recvd_rpc = @server.request_call
    917         recvd_call = recvd_rpc.call
    918         server_call = GRPC::ActiveCall.new(
    919           recvd_call, noop, noop, INFINITE_FUTURE,
    920           metadata_received: true, started: false)
    921         server_call.send_initial_metadata
    922         server_call.remote_send('server call received')
    923         wait_for_shutdown_ok_callback.call
    924         # since the client is cancelling the call,
    925         # we should be able to shut down cleanly
    926         @server.shutdown_and_notify(nil)
    927         @server.close
    928       end
    929 
    930       it 'receives a grpc status code when reading from a cancelled bidi call' do
    931         # This test tries to trigger a 'RECV_INITIAL_METADATA' and/or
    932         # 'RECV_MESSAGE' op failure.
    933         # An attempt to read a message might fail; in that case, iteration
    934         # through the response stream should still result in a grpc status.
    935         server_can_shutdown = false
    936         server_can_shutdown_mu = Mutex.new
    937         server_can_shutdown_cv = ConditionVariable.new
    938         wait_for_shutdown_ok_callback = proc do
    939           server_can_shutdown_mu.synchronize do
    940             server_can_shutdown_cv.wait(server_can_shutdown_mu) until server_can_shutdown
    941           end
    942         end
    943         server_thread = Thread.new do
    944           run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback)
    945         end
    946         stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
    947         request_queue = Queue.new
    948         @sent_msgs = EnumeratorQueue.new(request_queue)
    949         responses = get_responses(stub)
    950         expect(responses.next).to eq('server call received')
    951         @op.cancel
    952         expect { responses.next }.to raise_error(GRPC::Cancelled)
    953         # Now let the server proceed to shut down.
    954         server_can_shutdown_mu.synchronize do
    955           server_can_shutdown = true
    956           server_can_shutdown_cv.broadcast
    957         end
    958         server_thread.join
    959         # Push a sentinel to allow the writer thread to finish
    960         request_queue.push(nil)
    961       end
    962     end
    963   end
    964 
    965   def run_server_streamer(expected_input, replys, status,
    966                           expected_metadata: {},
    967                           server_initial_md: {},
    968                           server_trailing_md: {})
    969     wanted_metadata = expected_metadata.clone
    970     wakey_thread do |notifier|
    971       c = expect_server_to_be_invoked(
    972         notifier, metadata_to_send: server_initial_md)
    973       wanted_metadata.each do |k, v|
    974         expect(c.metadata[k.to_s]).to eq(v)
    975       end
    976       expect(c.remote_read).to eq(expected_input)
    977       replys.each { |r| c.remote_send(r) }
    978       c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
    979                     metadata: server_trailing_md)
    980       close_active_server_call(c)
    981     end
    982   end
    983 
    984   def run_bidi_streamer_handle_inputs_first(expected_inputs, replys,
    985                                             status)
    986     wakey_thread do |notifier|
    987       c = expect_server_to_be_invoked(notifier)
    988       expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
    989       replys.each { |r| c.remote_send(r) }
    990       c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
    991       close_active_server_call(c)
    992     end
    993   end
    994 
    995   def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts,
    996                                        expected_metadata: {},
    997                                        server_initial_md: {},
    998                                        server_trailing_md: {})
    999     wanted_metadata = expected_metadata.clone
   1000     wakey_thread do |notifier|
   1001       c = expect_server_to_be_invoked(
   1002         notifier, metadata_to_send: server_initial_md)
   1003       wanted_metadata.each do |k, v|
   1004         expect(c.metadata[k.to_s]).to eq(v)
   1005       end
   1006       expected_inputs.each do |i|
   1007         if client_starts
   1008           expect(c.remote_read).to eq(i)
   1009           c.remote_send(i)
   1010         else
   1011           c.remote_send(i)
   1012           expect(c.remote_read).to eq(i)
   1013         end
   1014       end
   1015       c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
   1016                     metadata: server_trailing_md)
   1017       close_active_server_call(c)
   1018     end
   1019   end
   1020 
   1021   def run_client_streamer(expected_inputs, resp, status,
   1022                           expected_metadata: {},
   1023                           server_initial_md: {},
   1024                           server_trailing_md: {})
   1025     wanted_metadata = expected_metadata.clone
   1026     wakey_thread do |notifier|
   1027       c = expect_server_to_be_invoked(
   1028         notifier, metadata_to_send: server_initial_md)
   1029       expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
   1030       wanted_metadata.each do |k, v|
   1031         expect(c.metadata[k.to_s]).to eq(v)
   1032       end
   1033       c.remote_send(resp)
   1034       c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
   1035                     metadata: server_trailing_md)
   1036       close_active_server_call(c)
   1037     end
   1038   end
   1039 
   1040   def run_request_response(expected_input, resp, status,
   1041                            expected_metadata: {},
   1042                            server_initial_md: {},
   1043                            server_trailing_md: {})
   1044     wanted_metadata = expected_metadata.clone
   1045     wakey_thread do |notifier|
   1046       c = expect_server_to_be_invoked(
   1047         notifier, metadata_to_send: server_initial_md)
   1048       expect(c.remote_read).to eq(expected_input)
   1049       wanted_metadata.each do |k, v|
   1050         expect(c.metadata[k.to_s]).to eq(v)
   1051       end
   1052       c.remote_send(resp)
   1053       c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
   1054                     metadata: server_trailing_md)
   1055       close_active_server_call(c)
   1056     end
   1057   end
   1058 
   1059   def create_secure_test_server
   1060     certs = load_test_certs
   1061     secure_credentials = GRPC::Core::ServerCredentials.new(
   1062       nil, [{ private_key: certs[1], cert_chain: certs[2] }], false)
   1063 
   1064     @server = new_core_server_for_testing(nil)
   1065     @server.add_http2_port('0.0.0.0:0', secure_credentials)
   1066   end
   1067 
   1068   def create_test_server
   1069     @server = new_core_server_for_testing(nil)
   1070     @server.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
   1071   end
   1072 
   1073   def expect_server_to_be_invoked(notifier, metadata_to_send: nil)
   1074     @server.start
   1075     notifier.notify(nil)
   1076     recvd_rpc = @server.request_call
   1077     recvd_call = recvd_rpc.call
   1078     recvd_call.metadata = recvd_rpc.metadata
   1079     recvd_call.run_batch(SEND_INITIAL_METADATA => metadata_to_send)
   1080     GRPC::ActiveCall.new(recvd_call, noop, noop, INFINITE_FUTURE,
   1081                          metadata_received: true)
   1082   end
   1083 end
   1084