1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/system/data_pipe.h" 6 7 #include <string.h> 8 9 #include <algorithm> 10 #include <limits> 11 12 #include "base/logging.h" 13 #include "mojo/system/constants.h" 14 #include "mojo/system/memory.h" 15 #include "mojo/system/waiter_list.h" 16 17 namespace mojo { 18 namespace system { 19 20 void DataPipe::ProducerCancelAllWaiters() { 21 base::AutoLock locker(lock_); 22 DCHECK(has_local_producer_no_lock()); 23 producer_waiter_list_->CancelAllWaiters(); 24 } 25 26 void DataPipe::ProducerClose() { 27 base::AutoLock locker(lock_); 28 DCHECK(has_local_producer_no_lock()); 29 producer_waiter_list_.reset(); 30 ProducerCloseImplNoLock(); 31 } 32 33 MojoResult DataPipe::ProducerWriteData(const void* elements, 34 uint32_t* num_elements, 35 MojoWriteDataFlags flags) { 36 base::AutoLock locker(lock_); 37 DCHECK(has_local_producer_no_lock()); 38 39 if (producer_in_two_phase_write_) 40 return MOJO_RESULT_BUSY; 41 42 // TODO(vtl): This implementation may write less than requested, even if room 43 // is available. Fix this. (Probably make a subclass-specific impl.) 44 void* buffer = NULL; 45 uint32_t buffer_num_elements = *num_elements; 46 MojoResult rv = ProducerBeginWriteDataImplNoLock(&buffer, 47 &buffer_num_elements, 48 flags); 49 if (rv != MOJO_RESULT_OK) 50 return rv; 51 52 uint32_t num_elements_to_write = std::min(*num_elements, buffer_num_elements); 53 memcpy(buffer, elements, num_elements_to_write * element_size_); 54 55 rv = ProducerEndWriteDataImplNoLock(num_elements_to_write); 56 if (rv != MOJO_RESULT_OK) 57 return rv; 58 59 *num_elements = num_elements_to_write; 60 return MOJO_RESULT_OK; 61 } 62 63 MojoResult DataPipe::ProducerBeginWriteData(void** buffer, 64 uint32_t* buffer_num_elements, 65 MojoWriteDataFlags flags) { 66 base::AutoLock locker(lock_); 67 DCHECK(has_local_producer_no_lock()); 68 69 if (producer_in_two_phase_write_) 70 return MOJO_RESULT_BUSY; 71 72 MojoResult rv = ProducerBeginWriteDataImplNoLock(buffer, 73 buffer_num_elements, 74 flags); 75 if (rv != MOJO_RESULT_OK) 76 return rv; 77 78 producer_in_two_phase_write_ = true; 79 return MOJO_RESULT_OK; 80 } 81 82 MojoResult DataPipe::ProducerEndWriteData(uint32_t num_elements_written) { 83 base::AutoLock locker(lock_); 84 DCHECK(has_local_producer_no_lock()); 85 86 if (!producer_in_two_phase_write_) 87 return MOJO_RESULT_FAILED_PRECONDITION; 88 89 MojoResult rv = ProducerEndWriteDataImplNoLock(num_elements_written); 90 producer_in_two_phase_write_ = false; // End two-phase write even on failure. 91 return rv; 92 } 93 94 MojoResult DataPipe::ProducerAddWaiter(Waiter* waiter, 95 MojoWaitFlags flags, 96 MojoResult wake_result) { 97 base::AutoLock locker(lock_); 98 DCHECK(has_local_producer_no_lock()); 99 100 if ((flags & ProducerSatisfiedFlagsNoLock())) 101 return MOJO_RESULT_ALREADY_EXISTS; 102 if (!(flags & ProducerSatisfiableFlagsNoLock())) 103 return MOJO_RESULT_FAILED_PRECONDITION; 104 105 producer_waiter_list_->AddWaiter(waiter, flags, wake_result); 106 return MOJO_RESULT_OK; 107 } 108 109 void DataPipe::ProducerRemoveWaiter(Waiter* waiter) { 110 base::AutoLock locker(lock_); 111 DCHECK(has_local_producer_no_lock()); 112 producer_waiter_list_->RemoveWaiter(waiter); 113 } 114 115 void DataPipe::ConsumerCancelAllWaiters() { 116 base::AutoLock locker(lock_); 117 DCHECK(has_local_consumer_no_lock()); 118 consumer_waiter_list_->CancelAllWaiters(); 119 } 120 121 void DataPipe::ConsumerClose() { 122 base::AutoLock locker(lock_); 123 DCHECK(has_local_consumer_no_lock()); 124 consumer_waiter_list_.reset(); 125 ConsumerCloseImplNoLock(); 126 } 127 128 MojoResult DataPipe::ConsumerReadData(void* elements, 129 uint32_t* num_elements, 130 MojoReadDataFlags flags) { 131 base::AutoLock locker(lock_); 132 DCHECK(has_local_consumer_no_lock()); 133 134 if (consumer_in_two_phase_read_) 135 return MOJO_RESULT_BUSY; 136 137 if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) { 138 return ConsumerDiscardDataNoLock(num_elements, 139 (flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE)); 140 } 141 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) 142 return ConsumerQueryDataNoLock(num_elements); 143 144 // TODO(vtl): This implementation may write less than requested, even if room 145 // is available. Fix this. (Probably make a subclass-specific impl.) 146 const void* buffer = NULL; 147 uint32_t buffer_num_elements = 0; 148 MojoResult rv = ConsumerBeginReadDataImplNoLock(&buffer, 149 &buffer_num_elements, 150 flags); 151 if (rv != MOJO_RESULT_OK) 152 return rv; 153 154 uint32_t num_elements_to_read = std::min(*num_elements, buffer_num_elements); 155 memcpy(elements, buffer, num_elements_to_read * element_size_); 156 157 rv = ConsumerEndReadDataImplNoLock(num_elements_to_read); 158 if (rv != MOJO_RESULT_OK) 159 return rv; 160 161 *num_elements = num_elements_to_read; 162 return MOJO_RESULT_OK; 163 } 164 165 MojoResult DataPipe::ConsumerBeginReadData(const void** buffer, 166 uint32_t* buffer_num_elements, 167 MojoReadDataFlags flags) { 168 base::AutoLock locker(lock_); 169 DCHECK(has_local_consumer_no_lock()); 170 171 if (consumer_in_two_phase_read_) 172 return MOJO_RESULT_BUSY; 173 174 MojoResult rv = ConsumerBeginReadDataImplNoLock(buffer, 175 buffer_num_elements, 176 flags); 177 if (rv != MOJO_RESULT_OK) 178 return rv; 179 180 consumer_in_two_phase_read_ = true; 181 return MOJO_RESULT_OK; 182 } 183 184 MojoResult DataPipe::ConsumerEndReadData(uint32_t num_elements_read) { 185 base::AutoLock locker(lock_); 186 DCHECK(has_local_consumer_no_lock()); 187 188 if (!consumer_in_two_phase_read_) 189 return MOJO_RESULT_FAILED_PRECONDITION; 190 191 MojoResult rv = ConsumerEndReadDataImplNoLock(num_elements_read); 192 consumer_in_two_phase_read_ = false; // End two-phase read even on failure. 193 return rv; 194 } 195 196 MojoResult DataPipe::ConsumerAddWaiter(Waiter* waiter, 197 MojoWaitFlags flags, 198 MojoResult wake_result) { 199 base::AutoLock locker(lock_); 200 DCHECK(has_local_consumer_no_lock()); 201 202 if ((flags & ConsumerSatisfiedFlagsNoLock())) 203 return MOJO_RESULT_ALREADY_EXISTS; 204 if (!(flags & ConsumerSatisfiableFlagsNoLock())) 205 return MOJO_RESULT_FAILED_PRECONDITION; 206 207 consumer_waiter_list_->AddWaiter(waiter, flags, wake_result); 208 return MOJO_RESULT_OK; 209 } 210 211 void DataPipe::ConsumerRemoveWaiter(Waiter* waiter) { 212 base::AutoLock locker(lock_); 213 DCHECK(has_local_consumer_no_lock()); 214 consumer_waiter_list_->RemoveWaiter(waiter); 215 } 216 217 DataPipe::DataPipe(bool has_local_producer, bool has_local_consumer) 218 : element_size_(0), 219 producer_waiter_list_(has_local_producer ? new WaiterList() : NULL), 220 consumer_waiter_list_(has_local_consumer ? new WaiterList() : NULL), 221 producer_in_two_phase_write_(false), 222 consumer_in_two_phase_read_(false) { 223 DCHECK(has_local_producer || has_local_consumer); 224 } 225 226 DataPipe::~DataPipe() { 227 DCHECK(!has_local_producer_no_lock()); 228 DCHECK(!has_local_consumer_no_lock()); 229 } 230 231 MojoResult DataPipe::Init(bool may_discard, 232 size_t element_size, 233 size_t capacity_num_elements) { 234 // No need to lock: This method is not thread-safe. 235 236 if (element_size == 0) 237 return MOJO_RESULT_INVALID_ARGUMENT; 238 if (!capacity_num_elements) { 239 // Set the capacity to the default (rounded down by element size, but always 240 // at least one element). 241 capacity_num_elements = 242 std::max(static_cast<size_t>(1), 243 kDefaultDataPipeCapacityBytes / element_size); 244 } 245 if (capacity_num_elements > 246 std::numeric_limits<uint32_t>::max() / element_size) 247 return MOJO_RESULT_INVALID_ARGUMENT; 248 if (capacity_num_elements * element_size > kMaxDataPipeCapacityBytes) 249 return MOJO_RESULT_RESOURCE_EXHAUSTED; 250 251 may_discard_ = may_discard; 252 element_size_ = element_size; 253 capacity_num_elements_ = capacity_num_elements; 254 return MOJO_RESULT_OK; 255 } 256 257 void DataPipe::AwakeProducerWaitersForStateChangeNoLock() { 258 lock_.AssertAcquired(); 259 if (!has_local_producer_no_lock()) 260 return; 261 producer_waiter_list_->AwakeWaitersForStateChange( 262 ProducerSatisfiedFlagsNoLock(), ProducerSatisfiableFlagsNoLock()); 263 } 264 265 void DataPipe::AwakeConsumerWaitersForStateChangeNoLock() { 266 lock_.AssertAcquired(); 267 if (!has_local_consumer_no_lock()) 268 return; 269 consumer_waiter_list_->AwakeWaitersForStateChange( 270 ConsumerSatisfiedFlagsNoLock(), ConsumerSatisfiableFlagsNoLock()); 271 } 272 273 } // namespace system 274 } // namespace mojo 275