Home | History | Annotate | Download | only in lib
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/public/cpp/bindings/connector.h"
      6 
      7 #include <stdint.h>
      8 #include <utility>
      9 
     10 #include "base/bind.h"
     11 #include "base/location.h"
     12 #include "base/logging.h"
     13 #include "base/macros.h"
     14 #include "base/synchronization/lock.h"
     15 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h"
     16 #include "mojo/public/cpp/bindings/sync_handle_watcher.h"
     17 
     18 namespace mojo {
     19 
     20 Connector::Connector(ScopedMessagePipeHandle message_pipe,
     21                      ConnectorConfig config,
     22                      scoped_refptr<base::SingleThreadTaskRunner> runner)
     23     : message_pipe_(std::move(message_pipe)),
     24       task_runner_(std::move(runner)),
     25       weak_factory_(this) {
     26   if (config == MULTI_THREADED_SEND)
     27     lock_.emplace();
     28 
     29   weak_self_ = weak_factory_.GetWeakPtr();
     30   // Even though we don't have an incoming receiver, we still want to monitor
     31   // the message pipe to know if is closed or encounters an error.
     32   WaitToReadMore();
     33 }
     34 
     35 Connector::~Connector() {
     36   {
     37     // Allow for quick destruction on any thread if the pipe is already closed.
     38     base::AutoLock lock(connected_lock_);
     39     if (!connected_)
     40       return;
     41   }
     42 
     43   DCHECK(thread_checker_.CalledOnValidThread());
     44   CancelWait();
     45 }
     46 
     47 void Connector::CloseMessagePipe() {
     48   // Throw away the returned message pipe.
     49   PassMessagePipe();
     50 }
     51 
     52 ScopedMessagePipeHandle Connector::PassMessagePipe() {
     53   DCHECK(thread_checker_.CalledOnValidThread());
     54 
     55   CancelWait();
     56   internal::MayAutoLock locker(&lock_);
     57   ScopedMessagePipeHandle message_pipe = std::move(message_pipe_);
     58   weak_factory_.InvalidateWeakPtrs();
     59   sync_handle_watcher_callback_count_ = 0;
     60 
     61   base::AutoLock lock(connected_lock_);
     62   connected_ = false;
     63   return message_pipe;
     64 }
     65 
     66 void Connector::RaiseError() {
     67   DCHECK(thread_checker_.CalledOnValidThread());
     68 
     69   HandleError(true, true);
     70 }
     71 
     72 bool Connector::WaitForIncomingMessage(MojoDeadline deadline) {
     73   DCHECK(thread_checker_.CalledOnValidThread());
     74 
     75   if (error_)
     76     return false;
     77 
     78   ResumeIncomingMethodCallProcessing();
     79 
     80   MojoResult rv =
     81       Wait(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, deadline, nullptr);
     82   if (rv == MOJO_RESULT_SHOULD_WAIT || rv == MOJO_RESULT_DEADLINE_EXCEEDED)
     83     return false;
     84   if (rv != MOJO_RESULT_OK) {
     85     // Users that call WaitForIncomingMessage() should expect their code to be
     86     // re-entered, so we call the error handler synchronously.
     87     HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false);
     88     return false;
     89   }
     90   ignore_result(ReadSingleMessage(&rv));
     91   return (rv == MOJO_RESULT_OK);
     92 }
     93 
     94 void Connector::PauseIncomingMethodCallProcessing() {
     95   DCHECK(thread_checker_.CalledOnValidThread());
     96 
     97   if (paused_)
     98     return;
     99 
    100   paused_ = true;
    101   CancelWait();
    102 }
    103 
    104 void Connector::ResumeIncomingMethodCallProcessing() {
    105   DCHECK(thread_checker_.CalledOnValidThread());
    106 
    107   if (!paused_)
    108     return;
    109 
    110   paused_ = false;
    111   WaitToReadMore();
    112 }
    113 
    114 bool Connector::Accept(Message* message) {
    115   DCHECK(lock_ || thread_checker_.CalledOnValidThread());
    116 
    117   // It shouldn't hurt even if |error_| may be changed by a different thread at
    118   // the same time. The outcome is that we may write into |message_pipe_| after
    119   // encountering an error, which should be fine.
    120   if (error_)
    121     return false;
    122 
    123   internal::MayAutoLock locker(&lock_);
    124 
    125   if (!message_pipe_.is_valid() || drop_writes_)
    126     return true;
    127 
    128   MojoResult rv =
    129       WriteMessageNew(message_pipe_.get(), message->TakeMojoMessage(),
    130                       MOJO_WRITE_MESSAGE_FLAG_NONE);
    131 
    132   switch (rv) {
    133     case MOJO_RESULT_OK:
    134       break;
    135     case MOJO_RESULT_FAILED_PRECONDITION:
    136       // There's no point in continuing to write to this pipe since the other
    137       // end is gone. Avoid writing any future messages. Hide write failures
    138       // from the caller since we'd like them to continue consuming any backlog
    139       // of incoming messages before regarding the message pipe as closed.
    140       drop_writes_ = true;
    141       break;
    142     case MOJO_RESULT_BUSY:
    143       // We'd get a "busy" result if one of the message's handles is:
    144       //   - |message_pipe_|'s own handle;
    145       //   - simultaneously being used on another thread; or
    146       //   - in a "busy" state that prohibits it from being transferred (e.g.,
    147       //     a data pipe handle in the middle of a two-phase read/write,
    148       //     regardless of which thread that two-phase read/write is happening
    149       //     on).
    150       // TODO(vtl): I wonder if this should be a |DCHECK()|. (But, until
    151       // crbug.com/389666, etc. are resolved, this will make tests fail quickly
    152       // rather than hanging.)
    153       CHECK(false) << "Race condition or other bug detected";
    154       return false;
    155     default:
    156       // This particular write was rejected, presumably because of bad input.
    157       // The pipe is not necessarily in a bad state.
    158       return false;
    159   }
    160   return true;
    161 }
    162 
    163 void Connector::AllowWokenUpBySyncWatchOnSameThread() {
    164   DCHECK(thread_checker_.CalledOnValidThread());
    165 
    166   allow_woken_up_by_others_ = true;
    167 
    168   EnsureSyncWatcherExists();
    169   sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
    170 }
    171 
    172 bool Connector::SyncWatch(const bool* should_stop) {
    173   DCHECK(thread_checker_.CalledOnValidThread());
    174 
    175   if (error_)
    176     return false;
    177 
    178   ResumeIncomingMethodCallProcessing();
    179 
    180   EnsureSyncWatcherExists();
    181   return sync_watcher_->SyncWatch(should_stop);
    182 }
    183 
    184 void Connector::SetWatcherHeapProfilerTag(const char* tag) {
    185   heap_profiler_tag_ = tag;
    186   if (handle_watcher_) {
    187     handle_watcher_->set_heap_profiler_tag(tag);
    188   }
    189 }
    190 
    191 void Connector::OnWatcherHandleReady(MojoResult result) {
    192   OnHandleReadyInternal(result);
    193 }
    194 
    195 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) {
    196   base::WeakPtr<Connector> weak_self(weak_self_);
    197 
    198   sync_handle_watcher_callback_count_++;
    199   OnHandleReadyInternal(result);
    200   // At this point, this object might have been deleted.
    201   if (weak_self) {
    202     DCHECK_LT(0u, sync_handle_watcher_callback_count_);
    203     sync_handle_watcher_callback_count_--;
    204   }
    205 }
    206 
    207 void Connector::OnHandleReadyInternal(MojoResult result) {
    208   DCHECK(thread_checker_.CalledOnValidThread());
    209 
    210   if (result != MOJO_RESULT_OK) {
    211     HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false);
    212     return;
    213   }
    214   ReadAllAvailableMessages();
    215   // At this point, this object might have been deleted. Return.
    216 }
    217 
    218 void Connector::WaitToReadMore() {
    219   CHECK(!paused_);
    220   DCHECK(!handle_watcher_);
    221 
    222   handle_watcher_.reset(new Watcher(FROM_HERE, task_runner_));
    223   if (heap_profiler_tag_)
    224     handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_);
    225   MojoResult rv = handle_watcher_->Start(
    226       message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
    227       base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this)));
    228 
    229   if (rv != MOJO_RESULT_OK) {
    230     // If the watch failed because the handle is invalid or its conditions can
    231     // no longer be met, we signal the error asynchronously to avoid reentry.
    232     task_runner_->PostTask(
    233         FROM_HERE,
    234         base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv));
    235   }
    236 
    237   if (allow_woken_up_by_others_) {
    238     EnsureSyncWatcherExists();
    239     sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
    240   }
    241 }
    242 
    243 bool Connector::ReadSingleMessage(MojoResult* read_result) {
    244   CHECK(!paused_);
    245 
    246   bool receiver_result = false;
    247 
    248   // Detect if |this| was destroyed or the message pipe was closed/transferred
    249   // during message dispatch.
    250   base::WeakPtr<Connector> weak_self = weak_self_;
    251 
    252   Message message;
    253   const MojoResult rv = ReadMessage(message_pipe_.get(), &message);
    254   *read_result = rv;
    255 
    256   if (rv == MOJO_RESULT_OK) {
    257     receiver_result =
    258         incoming_receiver_ && incoming_receiver_->Accept(&message);
    259   }
    260 
    261   if (!weak_self)
    262     return false;
    263 
    264   if (rv == MOJO_RESULT_SHOULD_WAIT)
    265     return true;
    266 
    267   if (rv != MOJO_RESULT_OK) {
    268     HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false);
    269     return false;
    270   }
    271 
    272   if (enforce_errors_from_incoming_receiver_ && !receiver_result) {
    273     HandleError(true, false);
    274     return false;
    275   }
    276   return true;
    277 }
    278 
    279 void Connector::ReadAllAvailableMessages() {
    280   while (!error_) {
    281     MojoResult rv;
    282 
    283     if (!ReadSingleMessage(&rv)) {
    284       // Return immediately without touching any members. |this| may have been
    285       // destroyed.
    286       return;
    287     }
    288 
    289     if (paused_)
    290       return;
    291 
    292     if (rv == MOJO_RESULT_SHOULD_WAIT)
    293       break;
    294   }
    295 }
    296 
    297 void Connector::CancelWait() {
    298   handle_watcher_.reset();
    299   sync_watcher_.reset();
    300 }
    301 
    302 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) {
    303   if (error_ || !message_pipe_.is_valid())
    304     return;
    305 
    306   if (paused_) {
    307     // Enforce calling the error handler asynchronously if the user has paused
    308     // receiving messages. We need to wait until the user starts receiving
    309     // messages again.
    310     force_async_handler = true;
    311   }
    312 
    313   if (!force_pipe_reset && force_async_handler)
    314     force_pipe_reset = true;
    315 
    316   if (force_pipe_reset) {
    317     CancelWait();
    318     internal::MayAutoLock locker(&lock_);
    319     message_pipe_.reset();
    320     MessagePipe dummy_pipe;
    321     message_pipe_ = std::move(dummy_pipe.handle0);
    322   } else {
    323     CancelWait();
    324   }
    325 
    326   if (force_async_handler) {
    327     if (!paused_)
    328       WaitToReadMore();
    329   } else {
    330     error_ = true;
    331     if (!connection_error_handler_.is_null())
    332       connection_error_handler_.Run();
    333   }
    334 }
    335 
    336 void Connector::EnsureSyncWatcherExists() {
    337   if (sync_watcher_)
    338     return;
    339   sync_watcher_.reset(new SyncHandleWatcher(
    340       message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
    341       base::Bind(&Connector::OnSyncHandleWatcherHandleReady,
    342                  base::Unretained(this))));
    343 }
    344 
    345 }  // namespace mojo
    346