1 # Copyright 2015 gRPC authors. 2 # 3 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # you may not use this file except in compliance with the License. 5 # You may obtain a copy of the License at 6 # 7 # http://www.apache.org/licenses/LICENSE-2.0 8 # 9 # Unless required by applicable law or agreed to in writing, software 10 # distributed under the License is distributed on an "AS IS" BASIS, 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 # See the License for the specific language governing permissions and 13 # limitations under the License. 14 15 require 'spec_helper' 16 17 Thread.abort_on_exception = true 18 19 def wakey_thread(&blk) 20 n = GRPC::Notifier.new 21 t = Thread.new do 22 blk.call(n) 23 end 24 t.abort_on_exception = true 25 n.wait 26 t 27 end 28 29 def load_test_certs 30 test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata') 31 files = ['ca.pem', 'server1.key', 'server1.pem'] 32 files.map { |f| File.open(File.join(test_root, f)).read } 33 end 34 35 include GRPC::Core::StatusCodes 36 include GRPC::Core::TimeConsts 37 include GRPC::Core::CallOps 38 39 # check that methods on a finished/closed call t crash 40 def check_op_view_of_finished_client_call(op_view, 41 expected_metadata, 42 expected_trailing_metadata) 43 # use read_response_stream to try to iterate through 44 # possible response stream 45 fail('need something to attempt reads') unless block_given? 46 expect do 47 resp = op_view.execute 48 yield resp 49 end.to raise_error(GRPC::Core::CallError) 50 51 expect { op_view.start_call }.to raise_error(RuntimeError) 52 53 sanity_check_values_of_accessors(op_view, 54 expected_metadata, 55 expected_trailing_metadata) 56 57 expect do 58 op_view.wait 59 op_view.cancel 60 op_view.write_flag = 1 61 end.to_not raise_error 62 end 63 64 def sanity_check_values_of_accessors(op_view, 65 expected_metadata, 66 expected_trailing_metadata) 67 expected_status = Struct::Status.new 68 expected_status.code = 0 69 expected_status.details = 'OK' 70 expected_status.metadata = expected_trailing_metadata 71 72 expect(op_view.status).to eq(expected_status) 73 expect(op_view.metadata).to eq(expected_metadata) 74 expect(op_view.trailing_metadata).to eq(expected_trailing_metadata) 75 76 expect(op_view.cancelled?).to be(false) 77 expect(op_view.write_flag).to be(nil) 78 79 # The deadline attribute of a call can be either 80 # a GRPC::Core::TimeSpec or a Time, which are mutually exclusive. 81 # TODO: fix so that the accessor always returns the same type. 82 expect(op_view.deadline.is_a?(GRPC::Core::TimeSpec) || 83 op_view.deadline.is_a?(Time)).to be(true) 84 end 85 86 def close_active_server_call(active_server_call) 87 active_server_call.send(:set_input_stream_done) 88 active_server_call.send(:set_output_stream_done) 89 end 90 91 describe 'ClientStub' do # rubocop:disable Metrics/BlockLength 92 let(:noop) { proc { |x| x } } 93 94 before(:each) do 95 Thread.abort_on_exception = true 96 @server = nil 97 @method = 'an_rpc_method' 98 @pass = OK 99 @fail = INTERNAL 100 @metadata = { k1: 'v1', k2: 'v2' } 101 end 102 103 after(:each) do 104 unless @server.nil? 105 @server.shutdown_and_notify(from_relative_time(2)) 106 @server.close 107 end 108 end 109 110 describe '#new' do 111 let(:fake_host) { 'localhost:0' } 112 it 'can be created from a host and args' do 113 opts = { channel_args: { a_channel_arg: 'an_arg' } } 114 blk = proc do 115 GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts) 116 end 117 expect(&blk).not_to raise_error 118 end 119 120 it 'can be created with an channel override' do 121 opts = { 122 channel_args: { a_channel_arg: 'an_arg' }, 123 channel_override: @ch 124 } 125 blk = proc do 126 GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts) 127 end 128 expect(&blk).not_to raise_error 129 end 130 131 it 'cannot be created with a bad channel override' do 132 blk = proc do 133 opts = { 134 channel_args: { a_channel_arg: 'an_arg' }, 135 channel_override: Object.new 136 } 137 GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts) 138 end 139 expect(&blk).to raise_error 140 end 141 142 it 'cannot be created with bad credentials' do 143 blk = proc do 144 opts = { channel_args: { a_channel_arg: 'an_arg' } } 145 GRPC::ClientStub.new(fake_host, Object.new, **opts) 146 end 147 expect(&blk).to raise_error 148 end 149 150 it 'can be created with test test credentials' do 151 certs = load_test_certs 152 blk = proc do 153 opts = { 154 channel_args: { 155 GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr', 156 a_channel_arg: 'an_arg' 157 } 158 } 159 creds = GRPC::Core::ChannelCredentials.new(certs[0], nil, nil) 160 GRPC::ClientStub.new(fake_host, creds, **opts) 161 end 162 expect(&blk).to_not raise_error 163 end 164 end 165 166 describe '#request_response', request_response: true do 167 before(:each) do 168 @sent_msg, @resp = 'a_msg', 'a_reply' 169 end 170 171 shared_examples 'request response' do 172 it 'should send a request to/receive a reply from a server' do 173 server_port = create_test_server 174 th = run_request_response(@sent_msg, @resp, @pass) 175 stub = GRPC::ClientStub.new("localhost:#{server_port}", 176 :this_channel_is_insecure) 177 expect(get_response(stub)).to eq(@resp) 178 th.join 179 end 180 181 def metadata_test(md) 182 server_port = create_test_server 183 host = "localhost:#{server_port}" 184 th = run_request_response(@sent_msg, @resp, @pass, 185 expected_metadata: md) 186 stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) 187 @metadata = md 188 expect(get_response(stub)).to eq(@resp) 189 th.join 190 end 191 192 it 'should send metadata to the server ok' do 193 metadata_test(k1: 'v1', k2: 'v2') 194 end 195 196 # these tests mostly try to exercise when md might be allocated 197 # instead of inlined 198 it 'should send metadata with multiple large md to the server ok' do 199 val_array = %w( 200 '00000000000000000000000000000000000000000000000000000000000000', 201 '11111111111111111111111111111111111111111111111111111111111111', 202 '22222222222222222222222222222222222222222222222222222222222222', 203 ) 204 md = { 205 k1: val_array, 206 k2: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', 207 k3: 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb', 208 k4: 'cccccccccccccccccccccccccccccccccccccccccccccccccccccccccc', 209 keeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeey5: 'v5', 210 'k66666666666666666666666666666666666666666666666666666' => 'v6', 211 'k77777777777777777777777777777777777777777777777777777' => 'v7', 212 'k88888888888888888888888888888888888888888888888888888' => 'v8' 213 } 214 metadata_test(md) 215 end 216 217 it 'should send a request when configured using an override channel' do 218 server_port = create_test_server 219 alt_host = "localhost:#{server_port}" 220 th = run_request_response(@sent_msg, @resp, @pass) 221 ch = GRPC::Core::Channel.new(alt_host, nil, :this_channel_is_insecure) 222 stub = GRPC::ClientStub.new('ignored-host', 223 :this_channel_is_insecure, 224 channel_override: ch) 225 expect(get_response(stub)).to eq(@resp) 226 th.join 227 end 228 229 it 'should raise an error if the status is not OK' do 230 server_port = create_test_server 231 host = "localhost:#{server_port}" 232 th = run_request_response(@sent_msg, @resp, @fail) 233 stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) 234 blk = proc { get_response(stub) } 235 expect(&blk).to raise_error(GRPC::BadStatus) 236 th.join 237 end 238 239 it 'should receive UNAVAILABLE if call credentials plugin fails' do 240 server_port = create_secure_test_server 241 server_started_notifier = GRPC::Notifier.new 242 th = Thread.new do 243 @server.start 244 server_started_notifier.notify(nil) 245 # Poll on the server so that the client connection can proceed. 246 # We don't expect the server to actually accept a call though. 247 expect { @server.request_call }.to raise_error(GRPC::Core::CallError) 248 end 249 server_started_notifier.wait 250 251 certs = load_test_certs 252 secure_channel_creds = GRPC::Core::ChannelCredentials.new( 253 certs[0], nil, nil) 254 secure_stub_opts = { 255 channel_args: { 256 GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr' 257 } 258 } 259 stub = GRPC::ClientStub.new("localhost:#{server_port}", 260 secure_channel_creds, **secure_stub_opts) 261 262 error_message = 'Failing call credentials callback' 263 failing_auth = proc do 264 fail error_message 265 end 266 creds = GRPC::Core::CallCredentials.new(failing_auth) 267 268 unavailable_error_occurred = false 269 begin 270 get_response(stub, credentials: creds) 271 rescue GRPC::Unavailable => e 272 unavailable_error_occurred = true 273 expect(e.details.include?(error_message)).to be true 274 end 275 expect(unavailable_error_occurred).to eq(true) 276 277 @server.shutdown_and_notify(Time.now + 3) 278 th.join 279 @server.close 280 end 281 282 it 'should raise ArgumentError if metadata contains invalid values' do 283 @metadata.merge!(k3: 3) 284 server_port = create_test_server 285 host = "localhost:#{server_port}" 286 stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) 287 expect do 288 get_response(stub) 289 end.to raise_error(ArgumentError, 290 /Header values must be of type string or array/) 291 end 292 end 293 294 describe 'without a call operation' do 295 def get_response(stub, credentials: nil) 296 GRPC.logger.info(credentials.inspect) 297 stub.request_response(@method, @sent_msg, noop, noop, 298 metadata: @metadata, 299 credentials: credentials) 300 end 301 302 it_behaves_like 'request response' 303 end 304 305 describe 'via a call operation' do 306 after(:each) do 307 # make sure op.wait doesn't hang, even if there's a bad status 308 @op.wait 309 end 310 def get_response(stub, run_start_call_first: false, credentials: nil) 311 @op = stub.request_response(@method, @sent_msg, noop, noop, 312 return_op: true, 313 metadata: @metadata, 314 deadline: from_relative_time(2), 315 credentials: credentials) 316 expect(@op).to be_a(GRPC::ActiveCall::Operation) 317 @op.start_call if run_start_call_first 318 result = @op.execute 319 result 320 end 321 322 it_behaves_like 'request response' 323 324 def run_op_view_metadata_test(run_start_call_first) 325 server_port = create_test_server 326 host = "localhost:#{server_port}" 327 328 @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } 329 @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } 330 th = run_request_response( 331 @sent_msg, @resp, @pass, 332 expected_metadata: @metadata, 333 server_initial_md: @server_initial_md, 334 server_trailing_md: @server_trailing_md) 335 stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) 336 expect( 337 get_response(stub, 338 run_start_call_first: run_start_call_first)).to eq(@resp) 339 th.join 340 end 341 342 it 'sends metadata to the server ok when running start_call first' do 343 run_op_view_metadata_test(true) 344 check_op_view_of_finished_client_call( 345 @op, @server_initial_md, @server_trailing_md 346 ) { |r| GRPC.logger.info(r) } 347 end 348 349 it 'does not crash when used after the call has been finished' do 350 run_op_view_metadata_test(false) 351 check_op_view_of_finished_client_call( 352 @op, @server_initial_md, @server_trailing_md 353 ) { |r| GRPC.logger.info(r) } 354 end 355 end 356 end 357 358 describe '#client_streamer', client_streamer: true do 359 before(:each) do 360 Thread.abort_on_exception = true 361 server_port = create_test_server 362 host = "localhost:#{server_port}" 363 @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) 364 @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } 365 @resp = 'a_reply' 366 end 367 368 shared_examples 'client streaming' do 369 it 'should send requests to/receive a reply from a server' do 370 th = run_client_streamer(@sent_msgs, @resp, @pass) 371 expect(get_response(@stub)).to eq(@resp) 372 th.join 373 end 374 375 it 'should send metadata to the server ok' do 376 th = run_client_streamer(@sent_msgs, @resp, @pass, 377 expected_metadata: @metadata) 378 expect(get_response(@stub)).to eq(@resp) 379 th.join 380 end 381 382 it 'should raise an error if the status is not ok' do 383 th = run_client_streamer(@sent_msgs, @resp, @fail) 384 blk = proc { get_response(@stub) } 385 expect(&blk).to raise_error(GRPC::BadStatus) 386 th.join 387 end 388 389 it 'should raise ArgumentError if metadata contains invalid values' do 390 @metadata.merge!(k3: 3) 391 expect do 392 get_response(@stub) 393 end.to raise_error(ArgumentError, 394 /Header values must be of type string or array/) 395 end 396 end 397 398 describe 'without a call operation' do 399 def get_response(stub) 400 stub.client_streamer(@method, @sent_msgs, noop, noop, 401 metadata: @metadata) 402 end 403 404 it_behaves_like 'client streaming' 405 end 406 407 describe 'via a call operation' do 408 after(:each) do 409 # make sure op.wait doesn't hang, even if there's a bad status 410 @op.wait 411 end 412 def get_response(stub, run_start_call_first: false) 413 @op = stub.client_streamer(@method, @sent_msgs, noop, noop, 414 return_op: true, metadata: @metadata) 415 expect(@op).to be_a(GRPC::ActiveCall::Operation) 416 @op.start_call if run_start_call_first 417 result = @op.execute 418 result 419 end 420 421 it_behaves_like 'client streaming' 422 423 def run_op_view_metadata_test(run_start_call_first) 424 @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } 425 @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } 426 th = run_client_streamer( 427 @sent_msgs, @resp, @pass, 428 expected_metadata: @metadata, 429 server_initial_md: @server_initial_md, 430 server_trailing_md: @server_trailing_md) 431 expect( 432 get_response(@stub, 433 run_start_call_first: run_start_call_first)).to eq(@resp) 434 th.join 435 end 436 437 it 'sends metadata to the server ok when running start_call first' do 438 run_op_view_metadata_test(true) 439 check_op_view_of_finished_client_call( 440 @op, @server_initial_md, @server_trailing_md 441 ) { |r| GRPC.logger.info(r) } 442 end 443 444 it 'does not crash when used after the call has been finished' do 445 run_op_view_metadata_test(false) 446 check_op_view_of_finished_client_call( 447 @op, @server_initial_md, @server_trailing_md 448 ) { |r| GRPC.logger.info(r) } 449 end 450 end 451 end 452 453 describe '#server_streamer', server_streamer: true do 454 before(:each) do 455 @sent_msg = 'a_msg' 456 @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } 457 end 458 459 shared_examples 'server streaming' do 460 it 'should send a request to/receive replies from a server' do 461 server_port = create_test_server 462 host = "localhost:#{server_port}" 463 th = run_server_streamer(@sent_msg, @replys, @pass) 464 stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) 465 expect(get_responses(stub).collect { |r| r }).to eq(@replys) 466 th.join 467 end 468 469 it 'should raise an error if the status is not ok' do 470 server_port = create_test_server 471 host = "localhost:#{server_port}" 472 th = run_server_streamer(@sent_msg, @replys, @fail) 473 stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) 474 e = get_responses(stub) 475 expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) 476 th.join 477 end 478 479 it 'should send metadata to the server ok' do 480 server_port = create_test_server 481 host = "localhost:#{server_port}" 482 th = run_server_streamer(@sent_msg, @replys, @fail, 483 expected_metadata: { k1: 'v1', k2: 'v2' }) 484 stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) 485 e = get_responses(stub) 486 expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) 487 th.join 488 end 489 490 it 'should raise ArgumentError if metadata contains invalid values' do 491 @metadata.merge!(k3: 3) 492 server_port = create_test_server 493 host = "localhost:#{server_port}" 494 stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) 495 expect do 496 get_responses(stub).collect { |r| r } 497 end.to raise_error(ArgumentError, 498 /Header values must be of type string or array/) 499 end 500 501 def run_server_streamer_against_client_with_unmarshal_error( 502 expected_input, replys) 503 wakey_thread do |notifier| 504 c = expect_server_to_be_invoked(notifier) 505 expect(c.remote_read).to eq(expected_input) 506 begin 507 replys.each { |r| c.remote_send(r) } 508 rescue GRPC::Core::CallError 509 # An attempt to write to the client might fail. This is ok 510 # because the client call is expected to fail when 511 # unmarshalling the first response, and to cancel the call, 512 # and there is a race as for when the server-side call will 513 # start to fail. 514 p 'remote_send failed (allowed because call expected to cancel)' 515 ensure 516 c.send_status(OK, 'OK', true) 517 close_active_server_call(c) 518 end 519 end 520 end 521 522 it 'the call terminates when there is an unmarshalling error' do 523 server_port = create_test_server 524 host = "localhost:#{server_port}" 525 th = run_server_streamer_against_client_with_unmarshal_error( 526 @sent_msg, @replys) 527 stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) 528 529 unmarshal = proc { fail(ArgumentError, 'test unmarshalling error') } 530 expect do 531 get_responses(stub, unmarshal: unmarshal).collect { |r| r } 532 end.to raise_error(ArgumentError, 'test unmarshalling error') 533 th.join 534 end 535 end 536 537 describe 'without a call operation' do 538 def get_responses(stub, unmarshal: noop) 539 e = stub.server_streamer(@method, @sent_msg, noop, unmarshal, 540 metadata: @metadata) 541 expect(e).to be_a(Enumerator) 542 e 543 end 544 545 it_behaves_like 'server streaming' 546 end 547 548 describe 'via a call operation' do 549 after(:each) do 550 @op.wait # make sure wait doesn't hang 551 end 552 def get_responses(stub, run_start_call_first: false, unmarshal: noop) 553 @op = stub.server_streamer(@method, @sent_msg, noop, unmarshal, 554 return_op: true, 555 metadata: @metadata) 556 expect(@op).to be_a(GRPC::ActiveCall::Operation) 557 @op.start_call if run_start_call_first 558 e = @op.execute 559 expect(e).to be_a(Enumerator) 560 e 561 end 562 563 it_behaves_like 'server streaming' 564 565 def run_op_view_metadata_test(run_start_call_first) 566 server_port = create_test_server 567 host = "localhost:#{server_port}" 568 @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } 569 @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } 570 th = run_server_streamer( 571 @sent_msg, @replys, @pass, 572 expected_metadata: @metadata, 573 server_initial_md: @server_initial_md, 574 server_trailing_md: @server_trailing_md) 575 stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) 576 e = get_responses(stub, run_start_call_first: run_start_call_first) 577 expect(e.collect { |r| r }).to eq(@replys) 578 th.join 579 end 580 581 it 'should send metadata to the server ok when start_call is run first' do 582 run_op_view_metadata_test(true) 583 check_op_view_of_finished_client_call( 584 @op, @server_initial_md, @server_trailing_md) do |responses| 585 responses.each { |r| GRPC.logger.info(r) } 586 end 587 end 588 589 it 'does not crash when used after the call has been finished' do 590 run_op_view_metadata_test(false) 591 check_op_view_of_finished_client_call( 592 @op, @server_initial_md, @server_trailing_md) do |responses| 593 responses.each { |r| GRPC.logger.info(r) } 594 end 595 end 596 597 it 'raises GRPC::Cancelled after the call has been cancelled' do 598 server_port = create_test_server 599 host = "localhost:#{server_port}" 600 th = run_server_streamer(@sent_msg, @replys, @pass) 601 stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) 602 resp = get_responses(stub, run_start_call_first: false) 603 expect(resp.next).to eq('reply_1') 604 @op.cancel 605 expect { resp.next }.to raise_error(GRPC::Cancelled) 606 th.join 607 end 608 end 609 end 610 611 describe '#bidi_streamer', bidi: true do 612 before(:each) do 613 @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } 614 @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } 615 server_port = create_test_server 616 @host = "localhost:#{server_port}" 617 end 618 619 shared_examples 'bidi streaming' do 620 it 'supports sending all the requests first' do 621 th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys, 622 @pass) 623 stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) 624 e = get_responses(stub) 625 expect(e.collect { |r| r }).to eq(@replys) 626 th.join 627 end 628 629 it 'supports client-initiated ping pong' do 630 th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true) 631 stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) 632 e = get_responses(stub) 633 expect(e.collect { |r| r }).to eq(@sent_msgs) 634 th.join 635 end 636 637 it 'supports a server-initiated ping pong' do 638 th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false) 639 stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) 640 e = get_responses(stub) 641 expect(e.collect { |r| r }).to eq(@sent_msgs) 642 th.join 643 end 644 645 it 'should raise an error if the status is not ok' do 646 th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @fail, false) 647 stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) 648 e = get_responses(stub) 649 expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) 650 th.join 651 end 652 653 it 'should raise ArgumentError if metadata contains invalid values' do 654 @metadata.merge!(k3: 3) 655 stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) 656 expect do 657 get_responses(stub).collect { |r| r } 658 end.to raise_error(ArgumentError, 659 /Header values must be of type string or array/) 660 end 661 662 it 'terminates if the call fails to start' do 663 # don't start the server 664 stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) 665 expect do 666 get_responses(stub, deadline: from_relative_time(0)).collect { |r| r } 667 end.to raise_error(GRPC::BadStatus) 668 end 669 670 it 'should send metadata to the server ok' do 671 th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true, 672 expected_metadata: @metadata) 673 stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) 674 e = get_responses(stub) 675 expect(e.collect { |r| r }).to eq(@sent_msgs) 676 th.join 677 end 678 679 # Prompted by grpc/github #10526 680 describe 'surfacing of errors when sending requests' do 681 def run_server_bidi_send_one_then_read_indefinitely 682 @server.start 683 recvd_rpc = @server.request_call 684 recvd_call = recvd_rpc.call 685 server_call = GRPC::ActiveCall.new( 686 recvd_call, noop, noop, INFINITE_FUTURE, 687 metadata_received: true, started: false) 688 server_call.send_initial_metadata 689 server_call.remote_send('server response') 690 loop do 691 m = server_call.remote_read 692 break if m.nil? 693 end 694 # can't fail since initial metadata already sent 695 server_call.send_status(@pass, 'OK', true) 696 close_active_server_call(server_call) 697 end 698 699 def verify_error_from_write_thread(stub, requests_to_push, 700 request_queue, expected_description) 701 # TODO: an improvement might be to raise the original exception from 702 # bidi call write loops instead of only cancelling the call 703 failing_marshal_proc = proc do |req| 704 fail req if req.is_a?(StandardError) 705 req 706 end 707 begin 708 e = get_responses(stub, marshal_proc: failing_marshal_proc) 709 first_response = e.next 710 expect(first_response).to eq('server response') 711 requests_to_push.each { |req| request_queue.push(req) } 712 e.collect { |r| r } 713 rescue GRPC::Unknown => e 714 exception = e 715 end 716 expect(exception.message.include?(expected_description)).to be(true) 717 end 718 719 # Provides an Enumerable view of a Queue 720 class BidiErrorTestingEnumerateForeverQueue 721 def initialize(queue) 722 @queue = queue 723 end 724 725 def each 726 loop do 727 msg = @queue.pop 728 yield msg 729 end 730 end 731 end 732 733 def run_error_in_client_request_stream_test(requests_to_push, 734 expected_error_message) 735 # start a server that waits on a read indefinitely - it should 736 # see a cancellation and be able to break out 737 th = Thread.new { run_server_bidi_send_one_then_read_indefinitely } 738 stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) 739 740 request_queue = Queue.new 741 @sent_msgs = BidiErrorTestingEnumerateForeverQueue.new(request_queue) 742 743 verify_error_from_write_thread(stub, 744 requests_to_push, 745 request_queue, 746 expected_error_message) 747 # the write loop errror should cancel the call and end the 748 # server's request stream 749 th.join 750 end 751 752 it 'non-GRPC errors from the write loop surface when raised ' \ 753 'at the start of a request stream' do 754 expected_error_message = 'expect error on first request' 755 requests_to_push = [StandardError.new(expected_error_message)] 756 run_error_in_client_request_stream_test(requests_to_push, 757 expected_error_message) 758 end 759 760 it 'non-GRPC errors from the write loop surface when raised ' \ 761 'during the middle of a request stream' do 762 expected_error_message = 'expect error on last request' 763 requests_to_push = %w( one two ) 764 requests_to_push << StandardError.new(expected_error_message) 765 run_error_in_client_request_stream_test(requests_to_push, 766 expected_error_message) 767 end 768 end 769 770 # Prompted by grpc/github #14853 771 describe 'client-side error handling on bidi streams' do 772 class EnumeratorQueue 773 def initialize(queue) 774 @queue = queue 775 end 776 777 def each 778 loop do 779 msg = @queue.pop 780 break if msg.nil? 781 yield msg 782 end 783 end 784 end 785 786 def run_server_bidi_shutdown_after_one_read 787 @server.start 788 recvd_rpc = @server.request_call 789 recvd_call = recvd_rpc.call 790 server_call = GRPC::ActiveCall.new( 791 recvd_call, noop, noop, INFINITE_FUTURE, 792 metadata_received: true, started: false) 793 expect(server_call.remote_read).to eq('first message') 794 @server.shutdown_and_notify(from_relative_time(0)) 795 @server.close 796 end 797 798 it 'receives a grpc status code when writes to a bidi stream fail' do 799 # This test tries to trigger the case when a 'SEND_MESSAGE' op 800 # and subseqeunt 'SEND_CLOSE_FROM_CLIENT' op of a bidi stream fails. 801 # In this case, iteration through the response stream should result 802 # in a grpc status code, and the writer thread should not raise an 803 # exception. 804 server_thread = Thread.new do 805 run_server_bidi_shutdown_after_one_read 806 end 807 stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) 808 request_queue = Queue.new 809 @sent_msgs = EnumeratorQueue.new(request_queue) 810 responses = get_responses(stub) 811 request_queue.push('first message') 812 # Now wait for the server to shut down. 813 server_thread.join 814 # Sanity check. This test is not interesting if 815 # Thread.abort_on_exception is not set. 816 expect(Thread.abort_on_exception).to be(true) 817 # An attempt to send a second message should fail now that the 818 # server is down. 819 request_queue.push('second message') 820 request_queue.push(nil) 821 expect { responses.next }.to raise_error(GRPC::BadStatus) 822 end 823 824 def run_server_bidi_shutdown_after_one_write 825 @server.start 826 recvd_rpc = @server.request_call 827 recvd_call = recvd_rpc.call 828 server_call = GRPC::ActiveCall.new( 829 recvd_call, noop, noop, INFINITE_FUTURE, 830 metadata_received: true, started: false) 831 server_call.send_initial_metadata 832 server_call.remote_send('message') 833 @server.shutdown_and_notify(from_relative_time(0)) 834 @server.close 835 end 836 837 it 'receives a grpc status code when reading from a failed bidi call' do 838 server_thread = Thread.new do 839 run_server_bidi_shutdown_after_one_write 840 end 841 stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) 842 request_queue = Queue.new 843 @sent_msgs = EnumeratorQueue.new(request_queue) 844 responses = get_responses(stub) 845 expect(responses.next).to eq('message') 846 # Wait for the server to shut down 847 server_thread.join 848 expect { responses.next }.to raise_error(GRPC::BadStatus) 849 # Push a sentinel to allow the writer thread to finish 850 request_queue.push(nil) 851 end 852 end 853 end 854 855 describe 'without a call operation' do 856 def get_responses(stub, deadline: nil, marshal_proc: noop) 857 e = stub.bidi_streamer(@method, @sent_msgs, marshal_proc, noop, 858 metadata: @metadata, deadline: deadline) 859 expect(e).to be_a(Enumerator) 860 e 861 end 862 863 it_behaves_like 'bidi streaming' 864 end 865 866 describe 'via a call operation' do 867 after(:each) do 868 @op.wait # make sure wait doesn't hang 869 end 870 def get_responses(stub, run_start_call_first: false, deadline: nil, 871 marshal_proc: noop) 872 @op = stub.bidi_streamer(@method, @sent_msgs, marshal_proc, noop, 873 return_op: true, 874 metadata: @metadata, deadline: deadline) 875 expect(@op).to be_a(GRPC::ActiveCall::Operation) 876 @op.start_call if run_start_call_first 877 e = @op.execute 878 expect(e).to be_a(Enumerator) 879 e 880 end 881 882 it_behaves_like 'bidi streaming' 883 884 def run_op_view_metadata_test(run_start_call_first) 885 @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } 886 @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } 887 th = run_bidi_streamer_echo_ping_pong( 888 @sent_msgs, @pass, true, 889 expected_metadata: @metadata, 890 server_initial_md: @server_initial_md, 891 server_trailing_md: @server_trailing_md) 892 stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) 893 e = get_responses(stub, run_start_call_first: run_start_call_first) 894 expect(e.collect { |r| r }).to eq(@sent_msgs) 895 th.join 896 end 897 898 it 'can run start_call before executing the call' do 899 run_op_view_metadata_test(true) 900 check_op_view_of_finished_client_call( 901 @op, @server_initial_md, @server_trailing_md) do |responses| 902 responses.each { |r| GRPC.logger.info(r) } 903 end 904 end 905 906 it 'doesnt crash when op_view used after call has finished' do 907 run_op_view_metadata_test(false) 908 check_op_view_of_finished_client_call( 909 @op, @server_initial_md, @server_trailing_md) do |responses| 910 responses.each { |r| GRPC.logger.info(r) } 911 end 912 end 913 914 def run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback) 915 @server.start 916 recvd_rpc = @server.request_call 917 recvd_call = recvd_rpc.call 918 server_call = GRPC::ActiveCall.new( 919 recvd_call, noop, noop, INFINITE_FUTURE, 920 metadata_received: true, started: false) 921 server_call.send_initial_metadata 922 server_call.remote_send('server call received') 923 wait_for_shutdown_ok_callback.call 924 # since the client is cancelling the call, 925 # we should be able to shut down cleanly 926 @server.shutdown_and_notify(nil) 927 @server.close 928 end 929 930 it 'receives a grpc status code when reading from a cancelled bidi call' do 931 # This test tries to trigger a 'RECV_INITIAL_METADATA' and/or 932 # 'RECV_MESSAGE' op failure. 933 # An attempt to read a message might fail; in that case, iteration 934 # through the response stream should still result in a grpc status. 935 server_can_shutdown = false 936 server_can_shutdown_mu = Mutex.new 937 server_can_shutdown_cv = ConditionVariable.new 938 wait_for_shutdown_ok_callback = proc do 939 server_can_shutdown_mu.synchronize do 940 server_can_shutdown_cv.wait(server_can_shutdown_mu) until server_can_shutdown 941 end 942 end 943 server_thread = Thread.new do 944 run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback) 945 end 946 stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) 947 request_queue = Queue.new 948 @sent_msgs = EnumeratorQueue.new(request_queue) 949 responses = get_responses(stub) 950 expect(responses.next).to eq('server call received') 951 @op.cancel 952 expect { responses.next }.to raise_error(GRPC::Cancelled) 953 # Now let the server proceed to shut down. 954 server_can_shutdown_mu.synchronize do 955 server_can_shutdown = true 956 server_can_shutdown_cv.broadcast 957 end 958 server_thread.join 959 # Push a sentinel to allow the writer thread to finish 960 request_queue.push(nil) 961 end 962 end 963 end 964 965 def run_server_streamer(expected_input, replys, status, 966 expected_metadata: {}, 967 server_initial_md: {}, 968 server_trailing_md: {}) 969 wanted_metadata = expected_metadata.clone 970 wakey_thread do |notifier| 971 c = expect_server_to_be_invoked( 972 notifier, metadata_to_send: server_initial_md) 973 wanted_metadata.each do |k, v| 974 expect(c.metadata[k.to_s]).to eq(v) 975 end 976 expect(c.remote_read).to eq(expected_input) 977 replys.each { |r| c.remote_send(r) } 978 c.send_status(status, status == @pass ? 'OK' : 'NOK', true, 979 metadata: server_trailing_md) 980 close_active_server_call(c) 981 end 982 end 983 984 def run_bidi_streamer_handle_inputs_first(expected_inputs, replys, 985 status) 986 wakey_thread do |notifier| 987 c = expect_server_to_be_invoked(notifier) 988 expected_inputs.each { |i| expect(c.remote_read).to eq(i) } 989 replys.each { |r| c.remote_send(r) } 990 c.send_status(status, status == @pass ? 'OK' : 'NOK', true) 991 close_active_server_call(c) 992 end 993 end 994 995 def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts, 996 expected_metadata: {}, 997 server_initial_md: {}, 998 server_trailing_md: {}) 999 wanted_metadata = expected_metadata.clone 1000 wakey_thread do |notifier| 1001 c = expect_server_to_be_invoked( 1002 notifier, metadata_to_send: server_initial_md) 1003 wanted_metadata.each do |k, v| 1004 expect(c.metadata[k.to_s]).to eq(v) 1005 end 1006 expected_inputs.each do |i| 1007 if client_starts 1008 expect(c.remote_read).to eq(i) 1009 c.remote_send(i) 1010 else 1011 c.remote_send(i) 1012 expect(c.remote_read).to eq(i) 1013 end 1014 end 1015 c.send_status(status, status == @pass ? 'OK' : 'NOK', true, 1016 metadata: server_trailing_md) 1017 close_active_server_call(c) 1018 end 1019 end 1020 1021 def run_client_streamer(expected_inputs, resp, status, 1022 expected_metadata: {}, 1023 server_initial_md: {}, 1024 server_trailing_md: {}) 1025 wanted_metadata = expected_metadata.clone 1026 wakey_thread do |notifier| 1027 c = expect_server_to_be_invoked( 1028 notifier, metadata_to_send: server_initial_md) 1029 expected_inputs.each { |i| expect(c.remote_read).to eq(i) } 1030 wanted_metadata.each do |k, v| 1031 expect(c.metadata[k.to_s]).to eq(v) 1032 end 1033 c.remote_send(resp) 1034 c.send_status(status, status == @pass ? 'OK' : 'NOK', true, 1035 metadata: server_trailing_md) 1036 close_active_server_call(c) 1037 end 1038 end 1039 1040 def run_request_response(expected_input, resp, status, 1041 expected_metadata: {}, 1042 server_initial_md: {}, 1043 server_trailing_md: {}) 1044 wanted_metadata = expected_metadata.clone 1045 wakey_thread do |notifier| 1046 c = expect_server_to_be_invoked( 1047 notifier, metadata_to_send: server_initial_md) 1048 expect(c.remote_read).to eq(expected_input) 1049 wanted_metadata.each do |k, v| 1050 expect(c.metadata[k.to_s]).to eq(v) 1051 end 1052 c.remote_send(resp) 1053 c.send_status(status, status == @pass ? 'OK' : 'NOK', true, 1054 metadata: server_trailing_md) 1055 close_active_server_call(c) 1056 end 1057 end 1058 1059 def create_secure_test_server 1060 certs = load_test_certs 1061 secure_credentials = GRPC::Core::ServerCredentials.new( 1062 nil, [{ private_key: certs[1], cert_chain: certs[2] }], false) 1063 1064 @server = new_core_server_for_testing(nil) 1065 @server.add_http2_port('0.0.0.0:0', secure_credentials) 1066 end 1067 1068 def create_test_server 1069 @server = new_core_server_for_testing(nil) 1070 @server.add_http2_port('0.0.0.0:0', :this_port_is_insecure) 1071 end 1072 1073 def expect_server_to_be_invoked(notifier, metadata_to_send: nil) 1074 @server.start 1075 notifier.notify(nil) 1076 recvd_rpc = @server.request_call 1077 recvd_call = recvd_rpc.call 1078 recvd_call.metadata = recvd_rpc.metadata 1079 recvd_call.run_batch(SEND_INITIAL_METADATA => metadata_to_send) 1080 GRPC::ActiveCall.new(recvd_call, noop, noop, INFINITE_FUTURE, 1081 metadata_received: true) 1082 end 1083 end 1084