Home | History | Annotate | Download | only in common
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/common/message_pump_mojo.h"
      6 
      7 #include <algorithm>
      8 #include <vector>
      9 
     10 #include "base/logging.h"
     11 #include "base/time/time.h"
     12 #include "mojo/common/message_pump_mojo_handler.h"
     13 
     14 namespace mojo {
     15 namespace common {
     16 
     17 // State needed for one iteration of WaitMany. The first handle and flags
     18 // corresponds to that of the control pipe.
     19 struct MessagePumpMojo::WaitState {
     20   std::vector<Handle> handles;
     21   std::vector<MojoWaitFlags> wait_flags;
     22 };
     23 
     24 struct MessagePumpMojo::RunState {
     25   RunState() : should_quit(false) {
     26     CreateMessagePipe(&read_handle, &write_handle);
     27   }
     28 
     29   base::TimeTicks delayed_work_time;
     30 
     31   // Used to wake up WaitForWork().
     32   ScopedMessagePipeHandle read_handle;
     33   ScopedMessagePipeHandle write_handle;
     34 
     35   bool should_quit;
     36 };
     37 
     38 MessagePumpMojo::MessagePumpMojo() : run_state_(NULL), next_handler_id_(0) {
     39 }
     40 
     41 MessagePumpMojo::~MessagePumpMojo() {
     42 }
     43 
     44 void MessagePumpMojo::AddHandler(MessagePumpMojoHandler* handler,
     45                                  const Handle& handle,
     46                                  MojoWaitFlags wait_flags,
     47                                  base::TimeTicks deadline) {
     48   DCHECK(handler);
     49   DCHECK(handle.is_valid());
     50   // Assume it's an error if someone tries to reregister an existing handle.
     51   DCHECK_EQ(0u, handlers_.count(handle));
     52   Handler handler_data;
     53   handler_data.handler = handler;
     54   handler_data.wait_flags = wait_flags;
     55   handler_data.deadline = deadline;
     56   handler_data.id = next_handler_id_++;
     57   handlers_[handle] = handler_data;
     58 }
     59 
     60 void MessagePumpMojo::RemoveHandler(const Handle& handle) {
     61   handlers_.erase(handle);
     62 }
     63 
     64 void MessagePumpMojo::Run(Delegate* delegate) {
     65   RunState* old_state = run_state_;
     66   RunState run_state;
     67   // TODO: better deal with error handling.
     68   CHECK(run_state.read_handle.is_valid());
     69   CHECK(run_state.write_handle.is_valid());
     70   run_state_ = &run_state;
     71   bool more_work_is_plausible = true;
     72   for (;;) {
     73     const bool block = !more_work_is_plausible;
     74     DoInternalWork(block);
     75 
     76     // There isn't a good way to know if there are more handles ready, we assume
     77     // not.
     78     more_work_is_plausible = false;
     79 
     80     if (run_state.should_quit)
     81       break;
     82 
     83     more_work_is_plausible |= delegate->DoWork();
     84     if (run_state.should_quit)
     85       break;
     86 
     87     more_work_is_plausible |= delegate->DoDelayedWork(
     88         &run_state.delayed_work_time);
     89     if (run_state.should_quit)
     90       break;
     91 
     92     if (more_work_is_plausible)
     93       continue;
     94 
     95     more_work_is_plausible = delegate->DoIdleWork();
     96     if (run_state.should_quit)
     97       break;
     98   }
     99   run_state_ = old_state;
    100 }
    101 
    102 void MessagePumpMojo::Quit() {
    103   if (run_state_)
    104     run_state_->should_quit = true;
    105 }
    106 
    107 void MessagePumpMojo::ScheduleWork() {
    108   SignalControlPipe();
    109 }
    110 
    111 void MessagePumpMojo::ScheduleDelayedWork(
    112     const base::TimeTicks& delayed_work_time) {
    113   if (!run_state_)
    114     return;
    115   run_state_->delayed_work_time = delayed_work_time;
    116   SignalControlPipe();
    117 }
    118 
    119 void MessagePumpMojo::DoInternalWork(bool block) {
    120   const MojoDeadline deadline = block ? GetDeadlineForWait() : 0;
    121   const WaitState wait_state = GetWaitState();
    122   const MojoResult result =
    123       WaitMany(wait_state.handles, wait_state.wait_flags, deadline);
    124   if (result == 0) {
    125     // Control pipe was written to.
    126     uint32_t num_bytes = 0;
    127     ReadMessageRaw(run_state_->read_handle.get(), NULL, &num_bytes, NULL, NULL,
    128                    MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
    129   } else if (result > 0) {
    130     const size_t index = static_cast<size_t>(result);
    131     DCHECK(handlers_.find(wait_state.handles[index]) != handlers_.end());
    132     handlers_[wait_state.handles[index]].handler->OnHandleReady(
    133         wait_state.handles[index]);
    134   } else {
    135     switch (result) {
    136       case MOJO_RESULT_INVALID_ARGUMENT:
    137       case MOJO_RESULT_FAILED_PRECONDITION:
    138         RemoveFirstInvalidHandle(wait_state);
    139         break;
    140       case MOJO_RESULT_DEADLINE_EXCEEDED:
    141         break;
    142       default:
    143         NOTREACHED();
    144     }
    145   }
    146 
    147   // Notify and remove any handlers whose time has expired. Make a copy in case
    148   // someone tries to add/remove new handlers from notification.
    149   const HandleToHandler cloned_handlers(handlers_);
    150   const base::TimeTicks now(base::TimeTicks::Now());
    151   for (HandleToHandler::const_iterator i = cloned_handlers.begin();
    152        i != cloned_handlers.end(); ++i) {
    153     // Since we're iterating over a clone of the handlers, verify the handler is
    154     // still valid before notifying.
    155     if (!i->second.deadline.is_null() && i->second.deadline < now &&
    156         handlers_.find(i->first) != handlers_.end() &&
    157         handlers_[i->first].id == i->second.id) {
    158       i->second.handler->OnHandleError(i->first, MOJO_RESULT_DEADLINE_EXCEEDED);
    159     }
    160   }
    161 }
    162 
    163 void MessagePumpMojo::RemoveFirstInvalidHandle(const WaitState& wait_state) {
    164   // TODO(sky): deal with control pipe going bad.
    165   for (size_t i = 1; i < wait_state.handles.size(); ++i) {
    166     const MojoResult result =
    167         Wait(wait_state.handles[i], wait_state.wait_flags[i], 0);
    168     if (result == MOJO_RESULT_INVALID_ARGUMENT ||
    169         result == MOJO_RESULT_FAILED_PRECONDITION) {
    170       // Remove the handle first, this way if OnHandleError() tries to remove
    171       // the handle our iterator isn't invalidated.
    172       DCHECK(handlers_.find(wait_state.handles[i]) != handlers_.end());
    173       MessagePumpMojoHandler* handler =
    174           handlers_[wait_state.handles[i]].handler;
    175       handlers_.erase(wait_state.handles[i]);
    176       handler->OnHandleError(wait_state.handles[i], result);
    177       return;
    178     } else {
    179       DCHECK_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, result);
    180     }
    181   }
    182 }
    183 
    184 void MessagePumpMojo::SignalControlPipe() {
    185   if (!run_state_)
    186     return;
    187 
    188   // TODO(sky): deal with error?
    189   WriteMessageRaw(run_state_->write_handle.get(), NULL, 0, NULL, 0,
    190                   MOJO_WRITE_MESSAGE_FLAG_NONE);
    191 }
    192 
    193 MessagePumpMojo::WaitState MessagePumpMojo::GetWaitState() const {
    194   WaitState wait_state;
    195   wait_state.handles.push_back(run_state_->read_handle.get());
    196   wait_state.wait_flags.push_back(MOJO_WAIT_FLAG_READABLE);
    197 
    198   for (HandleToHandler::const_iterator i = handlers_.begin();
    199        i != handlers_.end(); ++i) {
    200     wait_state.handles.push_back(i->first);
    201     wait_state.wait_flags.push_back(i->second.wait_flags);
    202   }
    203   return wait_state;
    204 }
    205 
    206 MojoDeadline MessagePumpMojo::GetDeadlineForWait() const {
    207   base::TimeTicks min_time = run_state_->delayed_work_time;
    208   for (HandleToHandler::const_iterator i = handlers_.begin();
    209        i != handlers_.end(); ++i) {
    210     if (min_time.is_null() && i->second.deadline < min_time)
    211       min_time = i->second.deadline;
    212   }
    213   return min_time.is_null() ? MOJO_DEADLINE_INDEFINITE :
    214       std::max(static_cast<MojoDeadline>(0),
    215                static_cast<MojoDeadline>(
    216                    (min_time - base::TimeTicks::Now()).InMicroseconds()));
    217 }
    218 
    219 }  // namespace common
    220 }  // namespace mojo
    221