Home | History | Annotate | Download | only in tests
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/public/cpp/bindings/connector.h"
      6 
      7 #include <stddef.h>
      8 #include <stdlib.h>
      9 #include <string.h>
     10 #include <utility>
     11 
     12 #include "base/bind.h"
     13 #include "base/callback.h"
     14 #include "base/callback_helpers.h"
     15 #include "base/message_loop/message_loop.h"
     16 #include "base/run_loop.h"
     17 #include "base/threading/thread_task_runner_handle.h"
     18 #include "mojo/public/cpp/bindings/lib/message_builder.h"
     19 #include "mojo/public/cpp/bindings/tests/message_queue.h"
     20 #include "testing/gtest/include/gtest/gtest.h"
     21 
     22 namespace mojo {
     23 namespace test {
     24 namespace {
     25 
     26 class MessageAccumulator : public MessageReceiver {
     27  public:
     28   MessageAccumulator() {}
     29   explicit MessageAccumulator(const base::Closure& closure)
     30       : closure_(closure) {}
     31 
     32   bool Accept(Message* message) override {
     33     queue_.Push(message);
     34     if (!closure_.is_null())
     35       base::ResetAndReturn(&closure_).Run();
     36     return true;
     37   }
     38 
     39   bool IsEmpty() const { return queue_.IsEmpty(); }
     40 
     41   void Pop(Message* message) { queue_.Pop(message); }
     42 
     43   void set_closure(const base::Closure& closure) { closure_ = closure; }
     44 
     45   size_t size() const { return queue_.size(); }
     46 
     47  private:
     48   MessageQueue queue_;
     49   base::Closure closure_;
     50 };
     51 
     52 class ConnectorDeletingMessageAccumulator : public MessageAccumulator {
     53  public:
     54   ConnectorDeletingMessageAccumulator(Connector** connector)
     55       : connector_(connector) {}
     56 
     57   bool Accept(Message* message) override {
     58     delete *connector_;
     59     *connector_ = nullptr;
     60     return MessageAccumulator::Accept(message);
     61   }
     62 
     63  private:
     64   Connector** connector_;
     65 };
     66 
     67 class ReentrantMessageAccumulator : public MessageAccumulator {
     68  public:
     69   ReentrantMessageAccumulator(Connector* connector)
     70       : connector_(connector), number_of_calls_(0) {}
     71 
     72   bool Accept(Message* message) override {
     73     if (!MessageAccumulator::Accept(message))
     74       return false;
     75     number_of_calls_++;
     76     if (number_of_calls_ == 1) {
     77       return connector_->WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
     78     }
     79     return true;
     80   }
     81 
     82   int number_of_calls() { return number_of_calls_; }
     83 
     84  private:
     85   Connector* connector_;
     86   int number_of_calls_;
     87 };
     88 
     89 class ConnectorTest : public testing::Test {
     90  public:
     91   ConnectorTest() {}
     92 
     93   void SetUp() override {
     94     CreateMessagePipe(nullptr, &handle0_, &handle1_);
     95   }
     96 
     97   void TearDown() override {}
     98 
     99   void AllocMessage(const char* text, Message* message) {
    100     size_t payload_size = strlen(text) + 1;  // Plus null terminator.
    101     internal::MessageBuilder builder(1, payload_size);
    102     memcpy(builder.buffer()->Allocate(payload_size), text, payload_size);
    103 
    104     builder.message()->MoveTo(message);
    105   }
    106 
    107  protected:
    108   ScopedMessagePipeHandle handle0_;
    109   ScopedMessagePipeHandle handle1_;
    110 
    111  private:
    112   base::MessageLoop loop_;
    113 };
    114 
    115 TEST_F(ConnectorTest, Basic) {
    116   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
    117                        base::ThreadTaskRunnerHandle::Get());
    118   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
    119                        base::ThreadTaskRunnerHandle::Get());
    120 
    121   const char kText[] = "hello world";
    122 
    123   Message message;
    124   AllocMessage(kText, &message);
    125 
    126   connector0.Accept(&message);
    127 
    128   base::RunLoop run_loop;
    129   MessageAccumulator accumulator(run_loop.QuitClosure());
    130   connector1.set_incoming_receiver(&accumulator);
    131 
    132   run_loop.Run();
    133 
    134   ASSERT_FALSE(accumulator.IsEmpty());
    135 
    136   Message message_received;
    137   accumulator.Pop(&message_received);
    138 
    139   EXPECT_EQ(
    140       std::string(kText),
    141       std::string(reinterpret_cast<const char*>(message_received.payload())));
    142 }
    143 
    144 TEST_F(ConnectorTest, Basic_Synchronous) {
    145   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
    146                        base::ThreadTaskRunnerHandle::Get());
    147   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
    148                        base::ThreadTaskRunnerHandle::Get());
    149 
    150   const char kText[] = "hello world";
    151 
    152   Message message;
    153   AllocMessage(kText, &message);
    154 
    155   connector0.Accept(&message);
    156 
    157   MessageAccumulator accumulator;
    158   connector1.set_incoming_receiver(&accumulator);
    159 
    160   connector1.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
    161 
    162   ASSERT_FALSE(accumulator.IsEmpty());
    163 
    164   Message message_received;
    165   accumulator.Pop(&message_received);
    166 
    167   EXPECT_EQ(
    168       std::string(kText),
    169       std::string(reinterpret_cast<const char*>(message_received.payload())));
    170 }
    171 
    172 TEST_F(ConnectorTest, Basic_EarlyIncomingReceiver) {
    173   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
    174                        base::ThreadTaskRunnerHandle::Get());
    175   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
    176                        base::ThreadTaskRunnerHandle::Get());
    177 
    178   base::RunLoop run_loop;
    179   MessageAccumulator accumulator(run_loop.QuitClosure());
    180   connector1.set_incoming_receiver(&accumulator);
    181 
    182   const char kText[] = "hello world";
    183 
    184   Message message;
    185   AllocMessage(kText, &message);
    186 
    187   connector0.Accept(&message);
    188 
    189   run_loop.Run();
    190 
    191   ASSERT_FALSE(accumulator.IsEmpty());
    192 
    193   Message message_received;
    194   accumulator.Pop(&message_received);
    195 
    196   EXPECT_EQ(
    197       std::string(kText),
    198       std::string(reinterpret_cast<const char*>(message_received.payload())));
    199 }
    200 
    201 TEST_F(ConnectorTest, Basic_TwoMessages) {
    202   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
    203                        base::ThreadTaskRunnerHandle::Get());
    204   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
    205                        base::ThreadTaskRunnerHandle::Get());
    206 
    207   const char* kText[] = {"hello", "world"};
    208 
    209   for (size_t i = 0; i < arraysize(kText); ++i) {
    210     Message message;
    211     AllocMessage(kText[i], &message);
    212 
    213     connector0.Accept(&message);
    214   }
    215 
    216   MessageAccumulator accumulator;
    217   connector1.set_incoming_receiver(&accumulator);
    218 
    219   for (size_t i = 0; i < arraysize(kText); ++i) {
    220     if (accumulator.IsEmpty()) {
    221       base::RunLoop run_loop;
    222       accumulator.set_closure(run_loop.QuitClosure());
    223       run_loop.Run();
    224     }
    225     ASSERT_FALSE(accumulator.IsEmpty());
    226 
    227     Message message_received;
    228     accumulator.Pop(&message_received);
    229 
    230     EXPECT_EQ(
    231         std::string(kText[i]),
    232         std::string(reinterpret_cast<const char*>(message_received.payload())));
    233   }
    234 }
    235 
    236 TEST_F(ConnectorTest, Basic_TwoMessages_Synchronous) {
    237   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
    238                        base::ThreadTaskRunnerHandle::Get());
    239   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
    240                        base::ThreadTaskRunnerHandle::Get());
    241 
    242   const char* kText[] = {"hello", "world"};
    243 
    244   for (size_t i = 0; i < arraysize(kText); ++i) {
    245     Message message;
    246     AllocMessage(kText[i], &message);
    247 
    248     connector0.Accept(&message);
    249   }
    250 
    251   MessageAccumulator accumulator;
    252   connector1.set_incoming_receiver(&accumulator);
    253 
    254   connector1.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
    255 
    256   ASSERT_FALSE(accumulator.IsEmpty());
    257 
    258   Message message_received;
    259   accumulator.Pop(&message_received);
    260 
    261   EXPECT_EQ(
    262       std::string(kText[0]),
    263       std::string(reinterpret_cast<const char*>(message_received.payload())));
    264 
    265   ASSERT_TRUE(accumulator.IsEmpty());
    266 }
    267 
    268 TEST_F(ConnectorTest, WriteToClosedPipe) {
    269   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
    270                        base::ThreadTaskRunnerHandle::Get());
    271 
    272   const char kText[] = "hello world";
    273 
    274   Message message;
    275   AllocMessage(kText, &message);
    276 
    277   // Close the other end of the pipe.
    278   handle1_.reset();
    279 
    280   // Not observed yet because we haven't spun the message loop yet.
    281   EXPECT_FALSE(connector0.encountered_error());
    282 
    283   // Write failures are not reported.
    284   bool ok = connector0.Accept(&message);
    285   EXPECT_TRUE(ok);
    286 
    287   // Still not observed.
    288   EXPECT_FALSE(connector0.encountered_error());
    289 
    290   // Spin the message loop, and then we should start observing the closed pipe.
    291   base::RunLoop run_loop;
    292   connector0.set_connection_error_handler(run_loop.QuitClosure());
    293   run_loop.Run();
    294 
    295   EXPECT_TRUE(connector0.encountered_error());
    296 }
    297 
    298 TEST_F(ConnectorTest, MessageWithHandles) {
    299   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
    300                        base::ThreadTaskRunnerHandle::Get());
    301   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
    302                        base::ThreadTaskRunnerHandle::Get());
    303 
    304   const char kText[] = "hello world";
    305 
    306   Message message1;
    307   AllocMessage(kText, &message1);
    308 
    309   MessagePipe pipe;
    310   message1.mutable_handles()->push_back(pipe.handle0.release());
    311 
    312   connector0.Accept(&message1);
    313 
    314   // The message should have been transferred, releasing the handles.
    315   EXPECT_TRUE(message1.handles()->empty());
    316 
    317   base::RunLoop run_loop;
    318   MessageAccumulator accumulator(run_loop.QuitClosure());
    319   connector1.set_incoming_receiver(&accumulator);
    320 
    321   run_loop.Run();
    322 
    323   ASSERT_FALSE(accumulator.IsEmpty());
    324 
    325   Message message_received;
    326   accumulator.Pop(&message_received);
    327 
    328   EXPECT_EQ(
    329       std::string(kText),
    330       std::string(reinterpret_cast<const char*>(message_received.payload())));
    331   ASSERT_EQ(1U, message_received.handles()->size());
    332 
    333   // Now send a message to the transferred handle and confirm it's sent through
    334   // to the orginal pipe.
    335   // TODO(vtl): Do we need a better way of "downcasting" the handle types?
    336   ScopedMessagePipeHandle smph;
    337   smph.reset(MessagePipeHandle(message_received.handles()->front().value()));
    338   message_received.mutable_handles()->front() = Handle();
    339   // |smph| now owns this handle.
    340 
    341   Connector connector_received(std::move(smph), Connector::SINGLE_THREADED_SEND,
    342                                base::ThreadTaskRunnerHandle::Get());
    343   Connector connector_original(std::move(pipe.handle1),
    344                                Connector::SINGLE_THREADED_SEND,
    345                                base::ThreadTaskRunnerHandle::Get());
    346 
    347   Message message2;
    348   AllocMessage(kText, &message2);
    349 
    350   connector_received.Accept(&message2);
    351   base::RunLoop run_loop2;
    352   MessageAccumulator accumulator2(run_loop2.QuitClosure());
    353   connector_original.set_incoming_receiver(&accumulator2);
    354   run_loop2.Run();
    355 
    356   ASSERT_FALSE(accumulator2.IsEmpty());
    357 
    358   accumulator2.Pop(&message_received);
    359 
    360   EXPECT_EQ(
    361       std::string(kText),
    362       std::string(reinterpret_cast<const char*>(message_received.payload())));
    363 }
    364 
    365 TEST_F(ConnectorTest, WaitForIncomingMessageWithError) {
    366   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
    367                        base::ThreadTaskRunnerHandle::Get());
    368   // Close the other end of the pipe.
    369   handle1_.reset();
    370   ASSERT_FALSE(connector0.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE));
    371 }
    372 
    373 TEST_F(ConnectorTest, WaitForIncomingMessageWithDeletion) {
    374   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
    375                        base::ThreadTaskRunnerHandle::Get());
    376   Connector* connector1 =
    377       new Connector(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
    378                     base::ThreadTaskRunnerHandle::Get());
    379 
    380   const char kText[] = "hello world";
    381 
    382   Message message;
    383   AllocMessage(kText, &message);
    384 
    385   connector0.Accept(&message);
    386 
    387   ConnectorDeletingMessageAccumulator accumulator(&connector1);
    388   connector1->set_incoming_receiver(&accumulator);
    389 
    390   connector1->WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
    391 
    392   ASSERT_FALSE(connector1);
    393   ASSERT_FALSE(accumulator.IsEmpty());
    394 
    395   Message message_received;
    396   accumulator.Pop(&message_received);
    397 
    398   EXPECT_EQ(
    399       std::string(kText),
    400       std::string(reinterpret_cast<const char*>(message_received.payload())));
    401 }
    402 
    403 TEST_F(ConnectorTest, WaitForIncomingMessageWithReentrancy) {
    404   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
    405                        base::ThreadTaskRunnerHandle::Get());
    406   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
    407                        base::ThreadTaskRunnerHandle::Get());
    408 
    409   const char* kText[] = {"hello", "world"};
    410 
    411   for (size_t i = 0; i < arraysize(kText); ++i) {
    412     Message message;
    413     AllocMessage(kText[i], &message);
    414 
    415     connector0.Accept(&message);
    416   }
    417 
    418   ReentrantMessageAccumulator accumulator(&connector1);
    419   connector1.set_incoming_receiver(&accumulator);
    420 
    421   for (size_t i = 0; i < arraysize(kText); ++i) {
    422     if (accumulator.IsEmpty()) {
    423       base::RunLoop run_loop;
    424       accumulator.set_closure(run_loop.QuitClosure());
    425       run_loop.Run();
    426     }
    427     ASSERT_FALSE(accumulator.IsEmpty());
    428 
    429     Message message_received;
    430     accumulator.Pop(&message_received);
    431 
    432     EXPECT_EQ(
    433         std::string(kText[i]),
    434         std::string(reinterpret_cast<const char*>(message_received.payload())));
    435   }
    436 
    437   ASSERT_EQ(2, accumulator.number_of_calls());
    438 }
    439 
    440 void ForwardErrorHandler(bool* called, const base::Closure& callback) {
    441   *called = true;
    442   callback.Run();
    443 }
    444 
    445 TEST_F(ConnectorTest, RaiseError) {
    446   base::RunLoop run_loop, run_loop2;
    447   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
    448                        base::ThreadTaskRunnerHandle::Get());
    449   bool error_handler_called0 = false;
    450   connector0.set_connection_error_handler(
    451       base::Bind(&ForwardErrorHandler, &error_handler_called0,
    452                  run_loop.QuitClosure()));
    453 
    454   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
    455                        base::ThreadTaskRunnerHandle::Get());
    456   bool error_handler_called1 = false;
    457   connector1.set_connection_error_handler(
    458       base::Bind(&ForwardErrorHandler, &error_handler_called1,
    459                  run_loop2.QuitClosure()));
    460 
    461   const char kText[] = "hello world";
    462 
    463   Message message;
    464   AllocMessage(kText, &message);
    465 
    466   connector0.Accept(&message);
    467   connector0.RaiseError();
    468 
    469   base::RunLoop run_loop3;
    470   MessageAccumulator accumulator(run_loop3.QuitClosure());
    471   connector1.set_incoming_receiver(&accumulator);
    472 
    473   run_loop3.Run();
    474 
    475   // Messages sent prior to RaiseError() still arrive at the other end.
    476   ASSERT_FALSE(accumulator.IsEmpty());
    477 
    478   Message message_received;
    479   accumulator.Pop(&message_received);
    480 
    481   EXPECT_EQ(
    482       std::string(kText),
    483       std::string(reinterpret_cast<const char*>(message_received.payload())));
    484 
    485   run_loop.Run();
    486   run_loop2.Run();
    487 
    488   // Connection error handler is called at both sides.
    489   EXPECT_TRUE(error_handler_called0);
    490   EXPECT_TRUE(error_handler_called1);
    491 
    492   // The error flag is set at both sides.
    493   EXPECT_TRUE(connector0.encountered_error());
    494   EXPECT_TRUE(connector1.encountered_error());
    495 
    496   // The message pipe handle is valid at both sides.
    497   EXPECT_TRUE(connector0.is_valid());
    498   EXPECT_TRUE(connector1.is_valid());
    499 }
    500 
    501 void PauseConnectorAndRunClosure(Connector* connector,
    502                                  const base::Closure& closure) {
    503   connector->PauseIncomingMethodCallProcessing();
    504   closure.Run();
    505 }
    506 
    507 TEST_F(ConnectorTest, PauseWithQueuedMessages) {
    508   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
    509                        base::ThreadTaskRunnerHandle::Get());
    510   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
    511                        base::ThreadTaskRunnerHandle::Get());
    512 
    513   const char kText[] = "hello world";
    514 
    515   // Queue up two messages.
    516   Message message;
    517   AllocMessage(kText, &message);
    518   connector0.Accept(&message);
    519   AllocMessage(kText, &message);
    520   connector0.Accept(&message);
    521 
    522   base::RunLoop run_loop;
    523   // Configure the accumulator such that it pauses after the first message is
    524   // received.
    525   MessageAccumulator accumulator(
    526       base::Bind(&PauseConnectorAndRunClosure, &connector1,
    527                  run_loop.QuitClosure()));
    528   connector1.set_incoming_receiver(&accumulator);
    529 
    530   run_loop.Run();
    531 
    532   // As we paused after the first message we should only have gotten one
    533   // message.
    534   ASSERT_EQ(1u, accumulator.size());
    535 }
    536 
    537 void AccumulateWithNestedLoop(MessageAccumulator* accumulator,
    538                               const base::Closure& closure) {
    539   base::RunLoop nested_run_loop;
    540   base::MessageLoop::ScopedNestableTaskAllower allow(
    541       base::MessageLoop::current());
    542   accumulator->set_closure(nested_run_loop.QuitClosure());
    543   nested_run_loop.Run();
    544   closure.Run();
    545 }
    546 
    547 TEST_F(ConnectorTest, ProcessWhenNested) {
    548   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
    549                        base::ThreadTaskRunnerHandle::Get());
    550   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
    551                        base::ThreadTaskRunnerHandle::Get());
    552 
    553   const char kText[] = "hello world";
    554 
    555   // Queue up two messages.
    556   Message message;
    557   AllocMessage(kText, &message);
    558   connector0.Accept(&message);
    559   AllocMessage(kText, &message);
    560   connector0.Accept(&message);
    561 
    562   base::RunLoop run_loop;
    563   MessageAccumulator accumulator;
    564   // When the accumulator gets the first message it spins a nested message
    565   // loop. The loop is quit when another message is received.
    566   accumulator.set_closure(base::Bind(&AccumulateWithNestedLoop, &accumulator,
    567                                      run_loop.QuitClosure()));
    568   connector1.set_incoming_receiver(&accumulator);
    569 
    570   run_loop.Run();
    571 
    572   ASSERT_EQ(2u, accumulator.size());
    573 }
    574 
    575 }  // namespace
    576 }  // namespace test
    577 }  // namespace mojo
    578