1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/system/local_message_pipe_endpoint.h" 6 7 #include <string.h> 8 9 #include "base/logging.h" 10 #include "mojo/system/dispatcher.h" 11 #include "mojo/system/message_in_transit.h" 12 13 namespace mojo { 14 namespace system { 15 16 LocalMessagePipeEndpoint::MessageQueueEntry::MessageQueueEntry() 17 : message(NULL) { 18 } 19 20 // See comment in header file. 21 LocalMessagePipeEndpoint::MessageQueueEntry::MessageQueueEntry( 22 const MessageQueueEntry& other) 23 : message(NULL) { 24 DCHECK(!other.message); 25 DCHECK(other.dispatchers.empty()); 26 } 27 28 LocalMessagePipeEndpoint::MessageQueueEntry::~MessageQueueEntry() { 29 if (message) 30 message->Destroy(); 31 // Close all the dispatchers. 32 for (size_t i = 0; i < dispatchers.size(); i++) { 33 // Note: Taking the |Dispatcher| locks is okay, since no one else should 34 // have a reference to the dispatchers (and the locks shouldn't be held). 35 DCHECK(dispatchers[i]->HasOneRef()); 36 dispatchers[i]->Close(); 37 } 38 } 39 40 LocalMessagePipeEndpoint::LocalMessagePipeEndpoint() 41 : is_open_(true), 42 is_peer_open_(true) { 43 } 44 45 LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() { 46 DCHECK(!is_open_); 47 } 48 49 void LocalMessagePipeEndpoint::Close() { 50 DCHECK(is_open_); 51 is_open_ = false; 52 message_queue_.clear(); 53 } 54 55 bool LocalMessagePipeEndpoint::OnPeerClose() { 56 DCHECK(is_open_); 57 DCHECK(is_peer_open_); 58 59 MojoWaitFlags old_satisfied_flags = SatisfiedFlags(); 60 MojoWaitFlags old_satisfiable_flags = SatisfiableFlags(); 61 is_peer_open_ = false; 62 MojoWaitFlags new_satisfied_flags = SatisfiedFlags(); 63 MojoWaitFlags new_satisfiable_flags = SatisfiableFlags(); 64 65 if (new_satisfied_flags != old_satisfied_flags || 66 new_satisfiable_flags != old_satisfiable_flags) { 67 waiter_list_.AwakeWaitersForStateChange(new_satisfied_flags, 68 new_satisfiable_flags); 69 } 70 71 return true; 72 } 73 74 MojoResult LocalMessagePipeEndpoint::CanEnqueueMessage( 75 const MessageInTransit* /*message*/, 76 const std::vector<Dispatcher*>* /*dispatchers*/) { 77 return MOJO_RESULT_OK; 78 } 79 80 void LocalMessagePipeEndpoint::EnqueueMessage( 81 MessageInTransit* message, 82 std::vector<scoped_refptr<Dispatcher> >* dispatchers) { 83 DCHECK(is_open_); 84 DCHECK(is_peer_open_); 85 86 bool was_empty = message_queue_.empty(); 87 message_queue_.push_back(MessageQueueEntry()); 88 message_queue_.back().message = message; 89 if (dispatchers) { 90 #ifndef NDEBUG 91 // It's important that we're taking "ownership" of the dispatchers. In 92 // particular, they must not be in the global handle table (i.e., have live 93 // handles referring to them). If we need to destroy any queued messages, we 94 // need to know that any handles in them should be closed. 95 for (size_t i = 0; i < dispatchers->size(); i++) 96 DCHECK((*dispatchers)[i]->HasOneRef()); 97 #endif 98 message_queue_.back().dispatchers.swap(*dispatchers); 99 } 100 if (was_empty) { 101 waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(), 102 SatisfiableFlags()); 103 } 104 } 105 106 void LocalMessagePipeEndpoint::CancelAllWaiters() { 107 DCHECK(is_open_); 108 waiter_list_.CancelAllWaiters(); 109 } 110 111 MojoResult LocalMessagePipeEndpoint::ReadMessage( 112 void* bytes, uint32_t* num_bytes, 113 std::vector<scoped_refptr<Dispatcher> >* dispatchers, 114 uint32_t* num_dispatchers, 115 MojoReadMessageFlags flags) { 116 DCHECK(is_open_); 117 DCHECK(!dispatchers || dispatchers->empty()); 118 119 const uint32_t max_bytes = num_bytes ? *num_bytes : 0; 120 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; 121 122 if (message_queue_.empty()) { 123 return is_peer_open_ ? MOJO_RESULT_NOT_FOUND : 124 MOJO_RESULT_FAILED_PRECONDITION; 125 } 126 127 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop 128 // and release the lock immediately. 129 bool enough_space = true; 130 const MessageInTransit* queued_message = message_queue_.front().message; 131 if (num_bytes) 132 *num_bytes = queued_message->data_size(); 133 if (queued_message->data_size() <= max_bytes) 134 memcpy(bytes, queued_message->data(), queued_message->data_size()); 135 else 136 enough_space = false; 137 138 std::vector<scoped_refptr<Dispatcher> >* queued_dispatchers = 139 &message_queue_.front().dispatchers; 140 if (num_dispatchers) 141 *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size()); 142 if (enough_space) { 143 if (queued_dispatchers->empty()) { 144 // Nothing to do. 145 } else if (queued_dispatchers->size() <= max_num_dispatchers) { 146 DCHECK(dispatchers); 147 dispatchers->swap(*queued_dispatchers); 148 } else { 149 enough_space = false; 150 } 151 } 152 153 if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) { 154 message_queue_.pop_front(); 155 156 // Now it's empty, thus no longer readable. 157 if (message_queue_.empty()) { 158 // It's currently not possible to wait for non-readability, but we should 159 // do the state change anyway. 160 waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(), 161 SatisfiableFlags()); 162 } 163 } 164 165 if (!enough_space) 166 return MOJO_RESULT_RESOURCE_EXHAUSTED; 167 168 return MOJO_RESULT_OK; 169 } 170 171 MojoResult LocalMessagePipeEndpoint::AddWaiter(Waiter* waiter, 172 MojoWaitFlags flags, 173 MojoResult wake_result) { 174 DCHECK(is_open_); 175 176 if ((flags & SatisfiedFlags())) 177 return MOJO_RESULT_ALREADY_EXISTS; 178 if (!(flags & SatisfiableFlags())) 179 return MOJO_RESULT_FAILED_PRECONDITION; 180 181 waiter_list_.AddWaiter(waiter, flags, wake_result); 182 return MOJO_RESULT_OK; 183 } 184 185 void LocalMessagePipeEndpoint::RemoveWaiter(Waiter* waiter) { 186 DCHECK(is_open_); 187 waiter_list_.RemoveWaiter(waiter); 188 } 189 190 MojoWaitFlags LocalMessagePipeEndpoint::SatisfiedFlags() { 191 MojoWaitFlags satisfied_flags = 0; 192 if (!message_queue_.empty()) 193 satisfied_flags |= MOJO_WAIT_FLAG_READABLE; 194 if (is_peer_open_) 195 satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE; 196 return satisfied_flags; 197 } 198 199 MojoWaitFlags LocalMessagePipeEndpoint::SatisfiableFlags() { 200 MojoWaitFlags satisfiable_flags = 0; 201 if (!message_queue_.empty() || is_peer_open_) 202 satisfiable_flags |= MOJO_WAIT_FLAG_READABLE; 203 if (is_peer_open_) 204 satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE; 205 return satisfiable_flags; 206 } 207 208 } // namespace system 209 } // namespace mojo 210