1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/system/data_pipe.h" 6 7 #include <string.h> 8 9 #include <algorithm> 10 #include <limits> 11 12 #include "base/compiler_specific.h" 13 #include "base/logging.h" 14 #include "mojo/system/constants.h" 15 #include "mojo/system/memory.h" 16 #include "mojo/system/options_validation.h" 17 #include "mojo/system/waiter_list.h" 18 19 namespace mojo { 20 namespace system { 21 22 // static 23 const MojoCreateDataPipeOptions DataPipe::kDefaultCreateOptions = { 24 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)), 25 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 26 1u, 27 static_cast<uint32_t>(kDefaultDataPipeCapacityBytes) 28 }; 29 30 // static 31 MojoResult DataPipe::ValidateCreateOptions( 32 const MojoCreateDataPipeOptions* in_options, 33 MojoCreateDataPipeOptions* out_options) { 34 const MojoCreateDataPipeOptionsFlags kKnownFlags = 35 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD; 36 37 *out_options = kDefaultCreateOptions; 38 if (!in_options) 39 return MOJO_RESULT_OK; 40 41 MojoResult result = 42 ValidateOptionsStructPointerSizeAndFlags<MojoCreateDataPipeOptions>( 43 in_options, kKnownFlags, out_options); 44 if (result != MOJO_RESULT_OK) 45 return result; 46 47 // Checks for fields beyond |flags|: 48 49 if (!HAS_OPTIONS_STRUCT_MEMBER(MojoCreateDataPipeOptions, element_num_bytes, 50 in_options)) 51 return MOJO_RESULT_OK; 52 if (in_options->element_num_bytes == 0) 53 return MOJO_RESULT_INVALID_ARGUMENT; 54 out_options->element_num_bytes = in_options->element_num_bytes; 55 56 if (!HAS_OPTIONS_STRUCT_MEMBER(MojoCreateDataPipeOptions, capacity_num_bytes, 57 in_options) || 58 in_options->capacity_num_bytes == 0) { 59 // Round the default capacity down to a multiple of the element size (but at 60 // least one element). 61 out_options->capacity_num_bytes = std::max( 62 static_cast<uint32_t>(kDefaultDataPipeCapacityBytes - 63 (kDefaultDataPipeCapacityBytes % out_options->element_num_bytes)), 64 out_options->element_num_bytes); 65 return MOJO_RESULT_OK; 66 } 67 if (in_options->capacity_num_bytes % out_options->element_num_bytes != 0) 68 return MOJO_RESULT_INVALID_ARGUMENT; 69 if (in_options->capacity_num_bytes > kMaxDataPipeCapacityBytes) 70 return MOJO_RESULT_RESOURCE_EXHAUSTED; 71 out_options->capacity_num_bytes = in_options->capacity_num_bytes; 72 73 return MOJO_RESULT_OK; 74 } 75 76 void DataPipe::ProducerCancelAllWaiters() { 77 base::AutoLock locker(lock_); 78 DCHECK(has_local_producer_no_lock()); 79 producer_waiter_list_->CancelAllWaiters(); 80 } 81 82 void DataPipe::ProducerClose() { 83 base::AutoLock locker(lock_); 84 DCHECK(producer_open_); 85 producer_open_ = false; 86 DCHECK(has_local_producer_no_lock()); 87 producer_waiter_list_.reset(); 88 // Not a bug, except possibly in "user" code. 89 DVLOG_IF(2, producer_in_two_phase_write_no_lock()) 90 << "Producer closed with active two-phase write"; 91 producer_two_phase_max_num_bytes_written_ = 0; 92 ProducerCloseImplNoLock(); 93 AwakeConsumerWaitersForStateChangeNoLock( 94 ConsumerGetHandleSignalsStateNoLock()); 95 } 96 97 MojoResult DataPipe::ProducerWriteData(const void* elements, 98 uint32_t* num_bytes, 99 bool all_or_none) { 100 base::AutoLock locker(lock_); 101 DCHECK(has_local_producer_no_lock()); 102 103 if (producer_in_two_phase_write_no_lock()) 104 return MOJO_RESULT_BUSY; 105 if (!consumer_open_no_lock()) 106 return MOJO_RESULT_FAILED_PRECONDITION; 107 108 // Returning "busy" takes priority over "invalid argument". 109 if (*num_bytes % element_num_bytes_ != 0) 110 return MOJO_RESULT_INVALID_ARGUMENT; 111 112 if (*num_bytes == 0) 113 return MOJO_RESULT_OK; // Nothing to do. 114 115 HandleSignalsState old_consumer_state = ConsumerGetHandleSignalsStateNoLock(); 116 MojoResult rv = ProducerWriteDataImplNoLock(elements, num_bytes, all_or_none); 117 HandleSignalsState new_consumer_state = ConsumerGetHandleSignalsStateNoLock(); 118 if (!new_consumer_state.equals(old_consumer_state)) 119 AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state); 120 return rv; 121 } 122 123 MojoResult DataPipe::ProducerBeginWriteData(void** buffer, 124 uint32_t* buffer_num_bytes, 125 bool all_or_none) { 126 base::AutoLock locker(lock_); 127 DCHECK(has_local_producer_no_lock()); 128 129 if (producer_in_two_phase_write_no_lock()) 130 return MOJO_RESULT_BUSY; 131 if (!consumer_open_no_lock()) 132 return MOJO_RESULT_FAILED_PRECONDITION; 133 134 if (all_or_none && *buffer_num_bytes % element_num_bytes_ != 0) 135 return MOJO_RESULT_INVALID_ARGUMENT; 136 137 MojoResult rv = ProducerBeginWriteDataImplNoLock(buffer, buffer_num_bytes, 138 all_or_none); 139 if (rv != MOJO_RESULT_OK) 140 return rv; 141 // Note: No need to awake producer waiters, even though we're going from 142 // writable to non-writable (since you can't wait on non-writability). 143 // Similarly, though this may have discarded data (in "may discard" mode), 144 // making it non-readable, there's still no need to awake consumer waiters. 145 DCHECK(producer_in_two_phase_write_no_lock()); 146 return MOJO_RESULT_OK; 147 } 148 149 MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) { 150 base::AutoLock locker(lock_); 151 DCHECK(has_local_producer_no_lock()); 152 153 if (!producer_in_two_phase_write_no_lock()) 154 return MOJO_RESULT_FAILED_PRECONDITION; 155 // Note: Allow successful completion of the two-phase write even if the 156 // consumer has been closed. 157 158 HandleSignalsState old_consumer_state = ConsumerGetHandleSignalsStateNoLock(); 159 MojoResult rv; 160 if (num_bytes_written > producer_two_phase_max_num_bytes_written_ || 161 num_bytes_written % element_num_bytes_ != 0) { 162 rv = MOJO_RESULT_INVALID_ARGUMENT; 163 producer_two_phase_max_num_bytes_written_ = 0; 164 } else { 165 rv = ProducerEndWriteDataImplNoLock(num_bytes_written); 166 } 167 // Two-phase write ended even on failure. 168 DCHECK(!producer_in_two_phase_write_no_lock()); 169 // If we're now writable, we *became* writable (since we weren't writable 170 // during the two-phase write), so awake producer waiters. 171 HandleSignalsState new_producer_state = ProducerGetHandleSignalsStateNoLock(); 172 if (new_producer_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) 173 AwakeProducerWaitersForStateChangeNoLock(new_producer_state); 174 HandleSignalsState new_consumer_state = ConsumerGetHandleSignalsStateNoLock(); 175 if (!new_consumer_state.equals(old_consumer_state)) 176 AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state); 177 return rv; 178 } 179 180 MojoResult DataPipe::ProducerAddWaiter(Waiter* waiter, 181 MojoHandleSignals signals, 182 uint32_t context) { 183 base::AutoLock locker(lock_); 184 DCHECK(has_local_producer_no_lock()); 185 186 HandleSignalsState producer_state = ProducerGetHandleSignalsStateNoLock(); 187 if (producer_state.satisfies(signals)) 188 return MOJO_RESULT_ALREADY_EXISTS; 189 if (!producer_state.can_satisfy(signals)) 190 return MOJO_RESULT_FAILED_PRECONDITION; 191 192 producer_waiter_list_->AddWaiter(waiter, signals, context); 193 return MOJO_RESULT_OK; 194 } 195 196 void DataPipe::ProducerRemoveWaiter(Waiter* waiter) { 197 base::AutoLock locker(lock_); 198 DCHECK(has_local_producer_no_lock()); 199 producer_waiter_list_->RemoveWaiter(waiter); 200 } 201 202 bool DataPipe::ProducerIsBusy() const { 203 base::AutoLock locker(lock_); 204 return producer_in_two_phase_write_no_lock(); 205 } 206 207 void DataPipe::ConsumerCancelAllWaiters() { 208 base::AutoLock locker(lock_); 209 DCHECK(has_local_consumer_no_lock()); 210 consumer_waiter_list_->CancelAllWaiters(); 211 } 212 213 void DataPipe::ConsumerClose() { 214 base::AutoLock locker(lock_); 215 DCHECK(consumer_open_); 216 consumer_open_ = false; 217 DCHECK(has_local_consumer_no_lock()); 218 consumer_waiter_list_.reset(); 219 // Not a bug, except possibly in "user" code. 220 DVLOG_IF(2, consumer_in_two_phase_read_no_lock()) 221 << "Consumer closed with active two-phase read"; 222 consumer_two_phase_max_num_bytes_read_ = 0; 223 ConsumerCloseImplNoLock(); 224 AwakeProducerWaitersForStateChangeNoLock( 225 ProducerGetHandleSignalsStateNoLock()); 226 } 227 228 MojoResult DataPipe::ConsumerReadData(void* elements, 229 uint32_t* num_bytes, 230 bool all_or_none) { 231 base::AutoLock locker(lock_); 232 DCHECK(has_local_consumer_no_lock()); 233 234 if (consumer_in_two_phase_read_no_lock()) 235 return MOJO_RESULT_BUSY; 236 237 if (*num_bytes % element_num_bytes_ != 0) 238 return MOJO_RESULT_INVALID_ARGUMENT; 239 240 if (*num_bytes == 0) 241 return MOJO_RESULT_OK; // Nothing to do. 242 243 HandleSignalsState old_producer_state = ProducerGetHandleSignalsStateNoLock(); 244 MojoResult rv = ConsumerReadDataImplNoLock(elements, num_bytes, all_or_none); 245 HandleSignalsState new_producer_state = ProducerGetHandleSignalsStateNoLock(); 246 if (!new_producer_state.equals(old_producer_state)) 247 AwakeProducerWaitersForStateChangeNoLock(new_producer_state); 248 return rv; 249 } 250 251 MojoResult DataPipe::ConsumerDiscardData(uint32_t* num_bytes, 252 bool all_or_none) { 253 base::AutoLock locker(lock_); 254 DCHECK(has_local_consumer_no_lock()); 255 256 if (consumer_in_two_phase_read_no_lock()) 257 return MOJO_RESULT_BUSY; 258 259 if (*num_bytes % element_num_bytes_ != 0) 260 return MOJO_RESULT_INVALID_ARGUMENT; 261 262 if (*num_bytes == 0) 263 return MOJO_RESULT_OK; // Nothing to do. 264 265 HandleSignalsState old_producer_state = ProducerGetHandleSignalsStateNoLock(); 266 MojoResult rv = ConsumerDiscardDataImplNoLock(num_bytes, all_or_none); 267 HandleSignalsState new_producer_state = ProducerGetHandleSignalsStateNoLock(); 268 if (!new_producer_state.equals(old_producer_state)) 269 AwakeProducerWaitersForStateChangeNoLock(new_producer_state); 270 return rv; 271 } 272 273 MojoResult DataPipe::ConsumerQueryData(uint32_t* num_bytes) { 274 base::AutoLock locker(lock_); 275 DCHECK(has_local_consumer_no_lock()); 276 277 if (consumer_in_two_phase_read_no_lock()) 278 return MOJO_RESULT_BUSY; 279 280 // Note: Don't need to validate |*num_bytes| for query. 281 return ConsumerQueryDataImplNoLock(num_bytes); 282 } 283 284 MojoResult DataPipe::ConsumerBeginReadData(const void** buffer, 285 uint32_t* buffer_num_bytes, 286 bool all_or_none) { 287 base::AutoLock locker(lock_); 288 DCHECK(has_local_consumer_no_lock()); 289 290 if (consumer_in_two_phase_read_no_lock()) 291 return MOJO_RESULT_BUSY; 292 293 if (all_or_none && *buffer_num_bytes % element_num_bytes_ != 0) 294 return MOJO_RESULT_INVALID_ARGUMENT; 295 296 MojoResult rv = ConsumerBeginReadDataImplNoLock(buffer, buffer_num_bytes, 297 all_or_none); 298 if (rv != MOJO_RESULT_OK) 299 return rv; 300 DCHECK(consumer_in_two_phase_read_no_lock()); 301 return MOJO_RESULT_OK; 302 } 303 304 MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) { 305 base::AutoLock locker(lock_); 306 DCHECK(has_local_consumer_no_lock()); 307 308 if (!consumer_in_two_phase_read_no_lock()) 309 return MOJO_RESULT_FAILED_PRECONDITION; 310 311 HandleSignalsState old_producer_state = ProducerGetHandleSignalsStateNoLock(); 312 MojoResult rv; 313 if (num_bytes_read > consumer_two_phase_max_num_bytes_read_ || 314 num_bytes_read % element_num_bytes_ != 0) { 315 rv = MOJO_RESULT_INVALID_ARGUMENT; 316 consumer_two_phase_max_num_bytes_read_ = 0; 317 } else { 318 rv = ConsumerEndReadDataImplNoLock(num_bytes_read); 319 } 320 // Two-phase read ended even on failure. 321 DCHECK(!consumer_in_two_phase_read_no_lock()); 322 // If we're now readable, we *became* readable (since we weren't readable 323 // during the two-phase read), so awake consumer waiters. 324 HandleSignalsState new_consumer_state = ConsumerGetHandleSignalsStateNoLock(); 325 if (new_consumer_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE)) 326 AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state); 327 HandleSignalsState new_producer_state = ProducerGetHandleSignalsStateNoLock(); 328 if (!new_producer_state.equals(old_producer_state)) 329 AwakeProducerWaitersForStateChangeNoLock(new_producer_state); 330 return rv; 331 } 332 333 MojoResult DataPipe::ConsumerAddWaiter(Waiter* waiter, 334 MojoHandleSignals signals, 335 uint32_t context) { 336 base::AutoLock locker(lock_); 337 DCHECK(has_local_consumer_no_lock()); 338 339 HandleSignalsState consumer_state = ConsumerGetHandleSignalsStateNoLock(); 340 if (consumer_state.satisfies(signals)) 341 return MOJO_RESULT_ALREADY_EXISTS; 342 if (!consumer_state.can_satisfy(signals)) 343 return MOJO_RESULT_FAILED_PRECONDITION; 344 345 consumer_waiter_list_->AddWaiter(waiter, signals, context); 346 return MOJO_RESULT_OK; 347 } 348 349 void DataPipe::ConsumerRemoveWaiter(Waiter* waiter) { 350 base::AutoLock locker(lock_); 351 DCHECK(has_local_consumer_no_lock()); 352 consumer_waiter_list_->RemoveWaiter(waiter); 353 } 354 355 bool DataPipe::ConsumerIsBusy() const { 356 base::AutoLock locker(lock_); 357 return consumer_in_two_phase_read_no_lock(); 358 } 359 360 361 DataPipe::DataPipe(bool has_local_producer, 362 bool has_local_consumer, 363 const MojoCreateDataPipeOptions& validated_options) 364 : may_discard_((validated_options.flags & 365 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD)), 366 element_num_bytes_(validated_options.element_num_bytes), 367 capacity_num_bytes_(validated_options.capacity_num_bytes), 368 producer_open_(true), 369 consumer_open_(true), 370 producer_waiter_list_(has_local_producer ? new WaiterList() : NULL), 371 consumer_waiter_list_(has_local_consumer ? new WaiterList() : NULL), 372 producer_two_phase_max_num_bytes_written_(0), 373 consumer_two_phase_max_num_bytes_read_(0) { 374 // Check that the passed in options actually are validated. 375 MojoCreateDataPipeOptions unused ALLOW_UNUSED = { 0 }; 376 DCHECK_EQ(ValidateCreateOptions(&validated_options, &unused), MOJO_RESULT_OK); 377 } 378 379 DataPipe::~DataPipe() { 380 DCHECK(!producer_open_); 381 DCHECK(!consumer_open_); 382 DCHECK(!producer_waiter_list_); 383 DCHECK(!consumer_waiter_list_); 384 } 385 386 void DataPipe::AwakeProducerWaitersForStateChangeNoLock( 387 const HandleSignalsState& new_producer_state) { 388 lock_.AssertAcquired(); 389 if (!has_local_producer_no_lock()) 390 return; 391 producer_waiter_list_->AwakeWaitersForStateChange(new_producer_state); 392 } 393 394 void DataPipe::AwakeConsumerWaitersForStateChangeNoLock( 395 const HandleSignalsState& new_consumer_state) { 396 lock_.AssertAcquired(); 397 if (!has_local_consumer_no_lock()) 398 return; 399 consumer_waiter_list_->AwakeWaitersForStateChange(new_consumer_state); 400 } 401 402 } // namespace system 403 } // namespace mojo 404