Home | History | Annotate | Download | only in system
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/system/local_message_pipe_endpoint.h"
      6 
      7 #include <string.h>
      8 
      9 #include "base/logging.h"
     10 #include "mojo/system/dispatcher.h"
     11 #include "mojo/system/message_in_transit.h"
     12 
     13 namespace mojo {
     14 namespace system {
     15 
     16 LocalMessagePipeEndpoint::LocalMessagePipeEndpoint()
     17     : is_open_(true), is_peer_open_(true) {
     18 }
     19 
     20 LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() {
     21   DCHECK(!is_open_);
     22   DCHECK(message_queue_.IsEmpty());  // Should be implied by not being open.
     23 }
     24 
     25 MessagePipeEndpoint::Type LocalMessagePipeEndpoint::GetType() const {
     26   return kTypeLocal;
     27 }
     28 
     29 bool LocalMessagePipeEndpoint::OnPeerClose() {
     30   DCHECK(is_open_);
     31   DCHECK(is_peer_open_);
     32 
     33   HandleSignalsState old_state = GetHandleSignalsState();
     34   is_peer_open_ = false;
     35   HandleSignalsState new_state = GetHandleSignalsState();
     36 
     37   if (!new_state.equals(old_state))
     38     waiter_list_.AwakeWaitersForStateChange(new_state);
     39 
     40   return true;
     41 }
     42 
     43 void LocalMessagePipeEndpoint::EnqueueMessage(
     44     scoped_ptr<MessageInTransit> message) {
     45   DCHECK(is_open_);
     46   DCHECK(is_peer_open_);
     47 
     48   bool was_empty = message_queue_.IsEmpty();
     49   message_queue_.AddMessage(message.Pass());
     50   if (was_empty)
     51     waiter_list_.AwakeWaitersForStateChange(GetHandleSignalsState());
     52 }
     53 
     54 void LocalMessagePipeEndpoint::Close() {
     55   DCHECK(is_open_);
     56   is_open_ = false;
     57   message_queue_.Clear();
     58 }
     59 
     60 void LocalMessagePipeEndpoint::CancelAllWaiters() {
     61   DCHECK(is_open_);
     62   waiter_list_.CancelAllWaiters();
     63 }
     64 
     65 MojoResult LocalMessagePipeEndpoint::ReadMessage(
     66     UserPointer<void> bytes,
     67     UserPointer<uint32_t> num_bytes,
     68     DispatcherVector* dispatchers,
     69     uint32_t* num_dispatchers,
     70     MojoReadMessageFlags flags) {
     71   DCHECK(is_open_);
     72   DCHECK(!dispatchers || dispatchers->empty());
     73 
     74   const uint32_t max_bytes = num_bytes.IsNull() ? 0 : num_bytes.Get();
     75   const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0;
     76 
     77   if (message_queue_.IsEmpty()) {
     78     return is_peer_open_ ? MOJO_RESULT_SHOULD_WAIT
     79                          : MOJO_RESULT_FAILED_PRECONDITION;
     80   }
     81 
     82   // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
     83   // and release the lock immediately.
     84   bool enough_space = true;
     85   MessageInTransit* message = message_queue_.PeekMessage();
     86   if (!num_bytes.IsNull())
     87     num_bytes.Put(message->num_bytes());
     88   if (message->num_bytes() <= max_bytes)
     89     bytes.PutArray(message->bytes(), message->num_bytes());
     90   else
     91     enough_space = false;
     92 
     93   if (DispatcherVector* queued_dispatchers = message->dispatchers()) {
     94     if (num_dispatchers)
     95       *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size());
     96     if (enough_space) {
     97       if (queued_dispatchers->empty()) {
     98         // Nothing to do.
     99       } else if (queued_dispatchers->size() <= max_num_dispatchers) {
    100         DCHECK(dispatchers);
    101         dispatchers->swap(*queued_dispatchers);
    102       } else {
    103         enough_space = false;
    104       }
    105     }
    106   } else {
    107     if (num_dispatchers)
    108       *num_dispatchers = 0;
    109   }
    110 
    111   message = nullptr;
    112 
    113   if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
    114     message_queue_.DiscardMessage();
    115 
    116     // Now it's empty, thus no longer readable.
    117     if (message_queue_.IsEmpty()) {
    118       // It's currently not possible to wait for non-readability, but we should
    119       // do the state change anyway.
    120       waiter_list_.AwakeWaitersForStateChange(GetHandleSignalsState());
    121     }
    122   }
    123 
    124   if (!enough_space)
    125     return MOJO_RESULT_RESOURCE_EXHAUSTED;
    126 
    127   return MOJO_RESULT_OK;
    128 }
    129 
    130 HandleSignalsState LocalMessagePipeEndpoint::GetHandleSignalsState() const {
    131   HandleSignalsState rv;
    132   if (!message_queue_.IsEmpty()) {
    133     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
    134     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
    135   }
    136   if (is_peer_open_) {
    137     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
    138     rv.satisfiable_signals |=
    139         MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE;
    140   }
    141   return rv;
    142 }
    143 
    144 MojoResult LocalMessagePipeEndpoint::AddWaiter(
    145     Waiter* waiter,
    146     MojoHandleSignals signals,
    147     uint32_t context,
    148     HandleSignalsState* signals_state) {
    149   DCHECK(is_open_);
    150 
    151   HandleSignalsState state = GetHandleSignalsState();
    152   if (state.satisfies(signals)) {
    153     if (signals_state)
    154       *signals_state = state;
    155     return MOJO_RESULT_ALREADY_EXISTS;
    156   }
    157   if (!state.can_satisfy(signals)) {
    158     if (signals_state)
    159       *signals_state = state;
    160     return MOJO_RESULT_FAILED_PRECONDITION;
    161   }
    162 
    163   waiter_list_.AddWaiter(waiter, signals, context);
    164   return MOJO_RESULT_OK;
    165 }
    166 
    167 void LocalMessagePipeEndpoint::RemoveWaiter(Waiter* waiter,
    168                                             HandleSignalsState* signals_state) {
    169   DCHECK(is_open_);
    170   waiter_list_.RemoveWaiter(waiter);
    171   if (signals_state)
    172     *signals_state = GetHandleSignalsState();
    173 }
    174 
    175 }  // namespace system
    176 }  // namespace mojo
    177