1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/services/public/cpp/network/web_socket_write_queue.h" 6 7 #include "base/bind.h" 8 9 namespace mojo { 10 11 struct WebSocketWriteQueue::Operation { 12 uint32_t num_bytes_; 13 base::Callback<void(const char*)> callback_; 14 15 const char* data_; 16 // Only initialized if the initial Write fails. This saves a copy in 17 // the common case. 18 std::vector<char> data_copy_; 19 }; 20 21 WebSocketWriteQueue::WebSocketWriteQueue(DataPipeProducerHandle handle) 22 : handle_(handle), is_waiting_(false) { 23 } 24 25 WebSocketWriteQueue::~WebSocketWriteQueue() { 26 } 27 28 void WebSocketWriteQueue::Write(const char* data, 29 uint32_t num_bytes, 30 base::Callback<void(const char*)> callback) { 31 Operation* op = new Operation; 32 op->num_bytes_ = num_bytes; 33 op->callback_ = callback; 34 op->data_ = data; 35 queue_.push_back(op); 36 37 MojoResult result = MOJO_RESULT_SHOULD_WAIT; 38 if (!is_waiting_) 39 result = TryToWrite(); 40 41 // If we have to wait, make a local copy of the data so we know it will 42 // live until we need it. 43 if (result == MOJO_RESULT_SHOULD_WAIT) { 44 op->data_copy_.resize(num_bytes); 45 memcpy(&op->data_copy_[0], data, num_bytes); 46 op->data_ = &op->data_copy_[0]; 47 } 48 } 49 50 MojoResult WebSocketWriteQueue::TryToWrite() { 51 Operation* op = queue_[0]; 52 uint32_t bytes_written = op->num_bytes_; 53 MojoResult result = WriteDataRaw( 54 handle_, op->data_, &bytes_written, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE); 55 if (result == MOJO_RESULT_SHOULD_WAIT) { 56 Wait(); 57 return result; 58 } 59 60 // Ensure |op| is deleted, whether or not |this| goes away. 61 scoped_ptr<Operation> op_deleter(op); 62 queue_.weak_erase(queue_.begin()); 63 if (result != MOJO_RESULT_OK) 64 return result; 65 66 op->callback_.Run(op->data_); // may delete |this| 67 return result; 68 } 69 70 void WebSocketWriteQueue::Wait() { 71 is_waiting_ = true; 72 handle_watcher_.Start(handle_, 73 MOJO_HANDLE_SIGNAL_WRITABLE, 74 MOJO_DEADLINE_INDEFINITE, 75 base::Bind(&WebSocketWriteQueue::OnHandleReady, 76 base::Unretained(this))); 77 } 78 79 void WebSocketWriteQueue::OnHandleReady(MojoResult result) { 80 is_waiting_ = false; 81 TryToWrite(); 82 } 83 84 } // namespace mojo 85