Home | History | Annotate | Download | only in common
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/common/message_pump_mojo.h"
      6 
      7 #include <algorithm>
      8 #include <vector>
      9 
     10 #include "base/debug/alias.h"
     11 #include "base/lazy_instance.h"
     12 #include "base/logging.h"
     13 #include "base/threading/thread_local.h"
     14 #include "base/time/time.h"
     15 #include "mojo/common/message_pump_mojo_handler.h"
     16 #include "mojo/common/time_helper.h"
     17 
     18 namespace mojo {
     19 namespace common {
     20 namespace {
     21 
     22 base::LazyInstance<base::ThreadLocalPointer<MessagePumpMojo> >::Leaky
     23     g_tls_current_pump = LAZY_INSTANCE_INITIALIZER;
     24 
     25 MojoDeadline TimeTicksToMojoDeadline(base::TimeTicks time_ticks,
     26                                      base::TimeTicks now) {
     27   // The is_null() check matches that of HandleWatcher as well as how
     28   // |delayed_work_time| is used.
     29   if (time_ticks.is_null())
     30     return MOJO_DEADLINE_INDEFINITE;
     31   const int64_t delta = (time_ticks - now).InMicroseconds();
     32   return delta < 0 ? static_cast<MojoDeadline>(0) :
     33                      static_cast<MojoDeadline>(delta);
     34 }
     35 
     36 }  // namespace
     37 
     38 // State needed for one iteration of WaitMany. The first handle and flags
     39 // corresponds to that of the control pipe.
     40 struct MessagePumpMojo::WaitState {
     41   std::vector<Handle> handles;
     42   std::vector<MojoHandleSignals> wait_signals;
     43 };
     44 
     45 struct MessagePumpMojo::RunState {
     46   RunState() : should_quit(false) {
     47     CreateMessagePipe(NULL, &read_handle, &write_handle);
     48   }
     49 
     50   base::TimeTicks delayed_work_time;
     51 
     52   // Used to wake up WaitForWork().
     53   ScopedMessagePipeHandle read_handle;
     54   ScopedMessagePipeHandle write_handle;
     55 
     56   bool should_quit;
     57 };
     58 
     59 MessagePumpMojo::MessagePumpMojo() : run_state_(NULL), next_handler_id_(0) {
     60   DCHECK(!current())
     61       << "There is already a MessagePumpMojo instance on this thread.";
     62   g_tls_current_pump.Pointer()->Set(this);
     63 }
     64 
     65 MessagePumpMojo::~MessagePumpMojo() {
     66   DCHECK_EQ(this, current());
     67   g_tls_current_pump.Pointer()->Set(NULL);
     68 }
     69 
     70 // static
     71 scoped_ptr<base::MessagePump> MessagePumpMojo::Create() {
     72   return scoped_ptr<MessagePump>(new MessagePumpMojo());
     73 }
     74 
     75 // static
     76 MessagePumpMojo* MessagePumpMojo::current() {
     77   return g_tls_current_pump.Pointer()->Get();
     78 }
     79 
     80 void MessagePumpMojo::AddHandler(MessagePumpMojoHandler* handler,
     81                                  const Handle& handle,
     82                                  MojoHandleSignals wait_signals,
     83                                  base::TimeTicks deadline) {
     84   CHECK(handler);
     85   DCHECK(handle.is_valid());
     86   // Assume it's an error if someone tries to reregister an existing handle.
     87   CHECK_EQ(0u, handlers_.count(handle));
     88   Handler handler_data;
     89   handler_data.handler = handler;
     90   handler_data.wait_signals = wait_signals;
     91   handler_data.deadline = deadline;
     92   handler_data.id = next_handler_id_++;
     93   handlers_[handle] = handler_data;
     94 }
     95 
     96 void MessagePumpMojo::RemoveHandler(const Handle& handle) {
     97   handlers_.erase(handle);
     98 }
     99 
    100 void MessagePumpMojo::Run(Delegate* delegate) {
    101   RunState run_state;
    102   // TODO: better deal with error handling.
    103   CHECK(run_state.read_handle.is_valid());
    104   CHECK(run_state.write_handle.is_valid());
    105   RunState* old_state = NULL;
    106   {
    107     base::AutoLock auto_lock(run_state_lock_);
    108     old_state = run_state_;
    109     run_state_ = &run_state;
    110   }
    111   DoRunLoop(&run_state, delegate);
    112   {
    113     base::AutoLock auto_lock(run_state_lock_);
    114     run_state_ = old_state;
    115   }
    116 }
    117 
    118 void MessagePumpMojo::Quit() {
    119   base::AutoLock auto_lock(run_state_lock_);
    120   if (run_state_)
    121     run_state_->should_quit = true;
    122 }
    123 
    124 void MessagePumpMojo::ScheduleWork() {
    125   base::AutoLock auto_lock(run_state_lock_);
    126   if (run_state_)
    127     SignalControlPipe(*run_state_);
    128 }
    129 
    130 void MessagePumpMojo::ScheduleDelayedWork(
    131     const base::TimeTicks& delayed_work_time) {
    132   base::AutoLock auto_lock(run_state_lock_);
    133   if (!run_state_)
    134     return;
    135   run_state_->delayed_work_time = delayed_work_time;
    136 }
    137 
    138 void MessagePumpMojo::DoRunLoop(RunState* run_state, Delegate* delegate) {
    139   bool more_work_is_plausible = true;
    140   for (;;) {
    141     const bool block = !more_work_is_plausible;
    142     DoInternalWork(*run_state, block);
    143 
    144     // There isn't a good way to know if there are more handles ready, we assume
    145     // not.
    146     more_work_is_plausible = false;
    147 
    148     if (run_state->should_quit)
    149       break;
    150 
    151     more_work_is_plausible |= delegate->DoWork();
    152     if (run_state->should_quit)
    153       break;
    154 
    155     more_work_is_plausible |= delegate->DoDelayedWork(
    156         &run_state->delayed_work_time);
    157     if (run_state->should_quit)
    158       break;
    159 
    160     if (more_work_is_plausible)
    161       continue;
    162 
    163     more_work_is_plausible = delegate->DoIdleWork();
    164     if (run_state->should_quit)
    165       break;
    166   }
    167 }
    168 
    169 void MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) {
    170   const MojoDeadline deadline = block ? GetDeadlineForWait(run_state) : 0;
    171   const WaitState wait_state = GetWaitState(run_state);
    172   const MojoResult result =
    173       WaitMany(wait_state.handles, wait_state.wait_signals, deadline);
    174   if (result == 0) {
    175     // Control pipe was written to.
    176     uint32_t num_bytes = 0;
    177     ReadMessageRaw(run_state.read_handle.get(), NULL, &num_bytes, NULL, NULL,
    178                    MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
    179   } else if (result > 0) {
    180     const size_t index = static_cast<size_t>(result);
    181     DCHECK(handlers_.find(wait_state.handles[index]) != handlers_.end());
    182     handlers_[wait_state.handles[index]].handler->OnHandleReady(
    183         wait_state.handles[index]);
    184   } else {
    185     switch (result) {
    186       case MOJO_RESULT_CANCELLED:
    187       case MOJO_RESULT_FAILED_PRECONDITION:
    188         RemoveFirstInvalidHandle(wait_state);
    189         break;
    190       case MOJO_RESULT_DEADLINE_EXCEEDED:
    191         break;
    192       default:
    193         base::debug::Alias(&result);
    194         // Unexpected result is likely fatal, crash so we can determine cause.
    195         CHECK(false);
    196     }
    197   }
    198 
    199   // Notify and remove any handlers whose time has expired. Make a copy in case
    200   // someone tries to add/remove new handlers from notification.
    201   const HandleToHandler cloned_handlers(handlers_);
    202   const base::TimeTicks now(internal::NowTicks());
    203   for (HandleToHandler::const_iterator i = cloned_handlers.begin();
    204        i != cloned_handlers.end(); ++i) {
    205     // Since we're iterating over a clone of the handlers, verify the handler is
    206     // still valid before notifying.
    207     if (!i->second.deadline.is_null() && i->second.deadline < now &&
    208         handlers_.find(i->first) != handlers_.end() &&
    209         handlers_[i->first].id == i->second.id) {
    210       i->second.handler->OnHandleError(i->first, MOJO_RESULT_DEADLINE_EXCEEDED);
    211     }
    212   }
    213 }
    214 
    215 void MessagePumpMojo::RemoveFirstInvalidHandle(const WaitState& wait_state) {
    216   // TODO(sky): deal with control pipe going bad.
    217   for (size_t i = 0; i < wait_state.handles.size(); ++i) {
    218     const MojoResult result =
    219         Wait(wait_state.handles[i], wait_state.wait_signals[i], 0);
    220     if (result == MOJO_RESULT_INVALID_ARGUMENT) {
    221       // We should never have an invalid argument. If we do it indicates
    222       // RemoveHandler() was not invoked and is likely to cause problems else
    223       // where in the stack if we ignore it.
    224       CHECK(false);
    225     } else if (result == MOJO_RESULT_FAILED_PRECONDITION ||
    226                result == MOJO_RESULT_CANCELLED) {
    227       CHECK_NE(i, 0u);  // Indicates the control pipe went bad.
    228 
    229       // Remove the handle first, this way if OnHandleError() tries to remove
    230       // the handle our iterator isn't invalidated.
    231       CHECK(handlers_.find(wait_state.handles[i]) != handlers_.end());
    232       MessagePumpMojoHandler* handler =
    233           handlers_[wait_state.handles[i]].handler;
    234       handlers_.erase(wait_state.handles[i]);
    235       handler->OnHandleError(wait_state.handles[i], result);
    236       return;
    237     }
    238   }
    239 }
    240 
    241 void MessagePumpMojo::SignalControlPipe(const RunState& run_state) {
    242   const MojoResult result =
    243       WriteMessageRaw(run_state.write_handle.get(), NULL, 0, NULL, 0,
    244                       MOJO_WRITE_MESSAGE_FLAG_NONE);
    245   // If we can't write we likely won't wake up the thread and there is a strong
    246   // chance we'll deadlock.
    247   CHECK_EQ(MOJO_RESULT_OK, result);
    248 }
    249 
    250 MessagePumpMojo::WaitState MessagePumpMojo::GetWaitState(
    251     const RunState& run_state) const {
    252   WaitState wait_state;
    253   wait_state.handles.push_back(run_state.read_handle.get());
    254   wait_state.wait_signals.push_back(MOJO_HANDLE_SIGNAL_READABLE);
    255 
    256   for (HandleToHandler::const_iterator i = handlers_.begin();
    257        i != handlers_.end(); ++i) {
    258     wait_state.handles.push_back(i->first);
    259     wait_state.wait_signals.push_back(i->second.wait_signals);
    260   }
    261   return wait_state;
    262 }
    263 
    264 MojoDeadline MessagePumpMojo::GetDeadlineForWait(
    265     const RunState& run_state) const {
    266   const base::TimeTicks now(internal::NowTicks());
    267   MojoDeadline deadline = TimeTicksToMojoDeadline(run_state.delayed_work_time,
    268                                                   now);
    269   for (HandleToHandler::const_iterator i = handlers_.begin();
    270        i != handlers_.end(); ++i) {
    271     deadline = std::min(
    272         TimeTicksToMojoDeadline(i->second.deadline, now), deadline);
    273   }
    274   return deadline;
    275 }
    276 
    277 }  // namespace common
    278 }  // namespace mojo
    279