Home | History | Annotate | Download | only in system
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/system/local_message_pipe_endpoint.h"
      6 
      7 #include <string.h>
      8 
      9 #include "base/logging.h"
     10 #include "mojo/system/dispatcher.h"
     11 #include "mojo/system/message_in_transit.h"
     12 
     13 namespace mojo {
     14 namespace system {
     15 
     16 LocalMessagePipeEndpoint::MessageQueueEntry::MessageQueueEntry()
     17     : message(NULL) {
     18 }
     19 
     20 // See comment in header file.
     21 LocalMessagePipeEndpoint::MessageQueueEntry::MessageQueueEntry(
     22     const MessageQueueEntry& other)
     23     : message(NULL) {
     24   DCHECK(!other.message);
     25   DCHECK(other.dispatchers.empty());
     26 }
     27 
     28 LocalMessagePipeEndpoint::MessageQueueEntry::~MessageQueueEntry() {
     29   if (message)
     30     message->Destroy();
     31   // Close all the dispatchers.
     32   for (size_t i = 0; i < dispatchers.size(); i++) {
     33     // Note: Taking the |Dispatcher| locks is okay, since no one else should
     34     // have a reference to the dispatchers (and the locks shouldn't be held).
     35     DCHECK(dispatchers[i]->HasOneRef());
     36     dispatchers[i]->Close();
     37   }
     38 }
     39 
     40 LocalMessagePipeEndpoint::LocalMessagePipeEndpoint()
     41     : is_open_(true),
     42       is_peer_open_(true) {
     43 }
     44 
     45 LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() {
     46   DCHECK(!is_open_);
     47 }
     48 
     49 void LocalMessagePipeEndpoint::Close() {
     50   DCHECK(is_open_);
     51   is_open_ = false;
     52   message_queue_.clear();
     53 }
     54 
     55 bool LocalMessagePipeEndpoint::OnPeerClose() {
     56   DCHECK(is_open_);
     57   DCHECK(is_peer_open_);
     58 
     59   MojoWaitFlags old_satisfied_flags = SatisfiedFlags();
     60   MojoWaitFlags old_satisfiable_flags = SatisfiableFlags();
     61   is_peer_open_ = false;
     62   MojoWaitFlags new_satisfied_flags = SatisfiedFlags();
     63   MojoWaitFlags new_satisfiable_flags = SatisfiableFlags();
     64 
     65   if (new_satisfied_flags != old_satisfied_flags ||
     66       new_satisfiable_flags != old_satisfiable_flags) {
     67     waiter_list_.AwakeWaitersForStateChange(new_satisfied_flags,
     68                                             new_satisfiable_flags);
     69   }
     70 
     71   return true;
     72 }
     73 
     74 MojoResult LocalMessagePipeEndpoint::CanEnqueueMessage(
     75     const MessageInTransit* /*message*/,
     76     const std::vector<Dispatcher*>* /*dispatchers*/) {
     77   return MOJO_RESULT_OK;
     78 }
     79 
     80 void LocalMessagePipeEndpoint::EnqueueMessage(
     81     MessageInTransit* message,
     82     std::vector<scoped_refptr<Dispatcher> >* dispatchers) {
     83   DCHECK(is_open_);
     84   DCHECK(is_peer_open_);
     85 
     86   bool was_empty = message_queue_.empty();
     87   message_queue_.push_back(MessageQueueEntry());
     88   message_queue_.back().message = message;
     89   if (dispatchers) {
     90 #ifndef NDEBUG
     91     // It's important that we're taking "ownership" of the dispatchers. In
     92     // particular, they must not be in the global handle table (i.e., have live
     93     // handles referring to them). If we need to destroy any queued messages, we
     94     // need to know that any handles in them should be closed.
     95     for (size_t i = 0; i < dispatchers->size(); i++)
     96       DCHECK((*dispatchers)[i]->HasOneRef());
     97 #endif
     98     message_queue_.back().dispatchers.swap(*dispatchers);
     99   }
    100   if (was_empty) {
    101     waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(),
    102                                             SatisfiableFlags());
    103   }
    104 }
    105 
    106 void LocalMessagePipeEndpoint::CancelAllWaiters() {
    107   DCHECK(is_open_);
    108   waiter_list_.CancelAllWaiters();
    109 }
    110 
    111 MojoResult LocalMessagePipeEndpoint::ReadMessage(
    112     void* bytes, uint32_t* num_bytes,
    113     std::vector<scoped_refptr<Dispatcher> >* dispatchers,
    114     uint32_t* num_dispatchers,
    115     MojoReadMessageFlags flags) {
    116   DCHECK(is_open_);
    117   DCHECK(!dispatchers || dispatchers->empty());
    118 
    119   const uint32_t max_bytes = num_bytes ? *num_bytes : 0;
    120   const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0;
    121 
    122   if (message_queue_.empty()) {
    123     return is_peer_open_ ? MOJO_RESULT_NOT_FOUND :
    124                            MOJO_RESULT_FAILED_PRECONDITION;
    125   }
    126 
    127   // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
    128   // and release the lock immediately.
    129   bool enough_space = true;
    130   const MessageInTransit* queued_message = message_queue_.front().message;
    131   if (num_bytes)
    132     *num_bytes = queued_message->data_size();
    133   if (queued_message->data_size() <= max_bytes)
    134     memcpy(bytes, queued_message->data(), queued_message->data_size());
    135   else
    136     enough_space = false;
    137 
    138   std::vector<scoped_refptr<Dispatcher> >* queued_dispatchers =
    139       &message_queue_.front().dispatchers;
    140   if (num_dispatchers)
    141     *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size());
    142   if (enough_space) {
    143     if (queued_dispatchers->empty()) {
    144       // Nothing to do.
    145     } else if (queued_dispatchers->size() <= max_num_dispatchers) {
    146       DCHECK(dispatchers);
    147       dispatchers->swap(*queued_dispatchers);
    148     } else {
    149       enough_space = false;
    150     }
    151   }
    152 
    153   if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
    154     message_queue_.pop_front();
    155 
    156     // Now it's empty, thus no longer readable.
    157     if (message_queue_.empty()) {
    158       // It's currently not possible to wait for non-readability, but we should
    159       // do the state change anyway.
    160       waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(),
    161                                               SatisfiableFlags());
    162     }
    163   }
    164 
    165   if (!enough_space)
    166     return MOJO_RESULT_RESOURCE_EXHAUSTED;
    167 
    168   return MOJO_RESULT_OK;
    169 }
    170 
    171 MojoResult LocalMessagePipeEndpoint::AddWaiter(Waiter* waiter,
    172                                                MojoWaitFlags flags,
    173                                                MojoResult wake_result) {
    174   DCHECK(is_open_);
    175 
    176   if ((flags & SatisfiedFlags()))
    177     return MOJO_RESULT_ALREADY_EXISTS;
    178   if (!(flags & SatisfiableFlags()))
    179     return MOJO_RESULT_FAILED_PRECONDITION;
    180 
    181   waiter_list_.AddWaiter(waiter, flags, wake_result);
    182   return MOJO_RESULT_OK;
    183 }
    184 
    185 void LocalMessagePipeEndpoint::RemoveWaiter(Waiter* waiter) {
    186   DCHECK(is_open_);
    187   waiter_list_.RemoveWaiter(waiter);
    188 }
    189 
    190 MojoWaitFlags LocalMessagePipeEndpoint::SatisfiedFlags() {
    191   MojoWaitFlags satisfied_flags = 0;
    192   if (!message_queue_.empty())
    193     satisfied_flags |= MOJO_WAIT_FLAG_READABLE;
    194   if (is_peer_open_)
    195     satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE;
    196   return satisfied_flags;
    197 }
    198 
    199 MojoWaitFlags LocalMessagePipeEndpoint::SatisfiableFlags() {
    200   MojoWaitFlags satisfiable_flags = 0;
    201   if (!message_queue_.empty() || is_peer_open_)
    202     satisfiable_flags |= MOJO_WAIT_FLAG_READABLE;
    203   if (is_peer_open_)
    204     satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE;
    205   return satisfiable_flags;
    206 }
    207 
    208 }  // namespace system
    209 }  // namespace mojo
    210