1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "device/serial/data_source_sender.h" 6 7 #include <limits> 8 9 #include "base/bind.h" 10 #include "base/message_loop/message_loop.h" 11 #include "device/serial/async_waiter.h" 12 13 namespace device { 14 15 // Represents a send that is not yet fulfilled. 16 class DataSourceSender::PendingSend { 17 public: 18 PendingSend(DataSourceSender* sender, const ReadyCallback& callback); 19 20 // Asynchronously fills |data| with up to |num_bytes| of data. Following this, 21 // one of Done() and DoneWithError() will be called with the result. 22 void GetData(void* data, uint32_t num_bytes); 23 24 private: 25 class Buffer; 26 // Reports a successful write of |bytes_written|. 27 void Done(uint32_t bytes_written); 28 29 // Reports a partially successful or unsuccessful write of |bytes_written| 30 // with an error of |error|. 31 void DoneWithError(uint32_t bytes_written, int32_t error); 32 33 // The DataSourceSender that owns this. 34 DataSourceSender* sender_; 35 36 // The callback to call to get data. 37 ReadyCallback callback_; 38 39 // Whether the buffer specified by GetData() has been passed to |callback_|, 40 // but has not yet called Done() or DoneWithError(). 41 bool buffer_in_use_; 42 }; 43 44 // A Writable implementation that provides a view of a data pipe owned by a 45 // DataSourceSender. 46 class DataSourceSender::PendingSend::Buffer : public WritableBuffer { 47 public: 48 Buffer(scoped_refptr<DataSourceSender> sender, 49 PendingSend* send, 50 char* buffer, 51 uint32_t buffer_size); 52 virtual ~Buffer(); 53 54 // WritableBuffer overrides. 55 virtual char* GetData() OVERRIDE; 56 virtual uint32_t GetSize() OVERRIDE; 57 virtual void Done(uint32_t bytes_written) OVERRIDE; 58 virtual void DoneWithError(uint32_t bytes_written, int32_t error) OVERRIDE; 59 60 private: 61 // The DataSourceSender whose data pipe we are providing a view. 62 scoped_refptr<DataSourceSender> sender_; 63 64 // The PendingSend to which this buffer has been created in response. 65 PendingSend* pending_send_; 66 67 char* buffer_; 68 uint32_t buffer_size_; 69 }; 70 71 DataSourceSender::DataSourceSender(const ReadyCallback& ready_callback, 72 const ErrorCallback& error_callback) 73 : ready_callback_(ready_callback), 74 error_callback_(error_callback), 75 bytes_sent_(0), 76 shut_down_(false) { 77 DCHECK(!ready_callback.is_null() && !error_callback.is_null()); 78 } 79 80 void DataSourceSender::ShutDown() { 81 shut_down_ = true; 82 waiter_.reset(); 83 ready_callback_.Reset(); 84 error_callback_.Reset(); 85 } 86 87 DataSourceSender::~DataSourceSender() { 88 DCHECK(shut_down_); 89 } 90 91 void DataSourceSender::Init(mojo::ScopedDataPipeProducerHandle handle) { 92 // This should never occur. |handle_| is only valid and |pending_send_| is 93 // only set after Init is called. 94 if (pending_send_ || handle_.is_valid() || shut_down_) { 95 DispatchFatalError(); 96 return; 97 } 98 handle_ = handle.Pass(); 99 pending_send_.reset(new PendingSend(this, ready_callback_)); 100 StartWaiting(); 101 } 102 103 void DataSourceSender::Resume() { 104 if (pending_send_ || !handle_.is_valid()) { 105 DispatchFatalError(); 106 return; 107 } 108 109 pending_send_.reset(new PendingSend(this, ready_callback_)); 110 StartWaiting(); 111 } 112 113 void DataSourceSender::OnConnectionError() { 114 DispatchFatalError(); 115 } 116 117 void DataSourceSender::StartWaiting() { 118 DCHECK(pending_send_ && !waiter_); 119 waiter_.reset( 120 new AsyncWaiter(handle_.get(), 121 MOJO_HANDLE_SIGNAL_WRITABLE, 122 base::Bind(&DataSourceSender::OnDoneWaiting, this))); 123 } 124 125 void DataSourceSender::OnDoneWaiting(MojoResult result) { 126 DCHECK(pending_send_ && !shut_down_ && waiter_); 127 waiter_.reset(); 128 if (result != MOJO_RESULT_OK) { 129 DispatchFatalError(); 130 return; 131 } 132 void* data = NULL; 133 uint32_t num_bytes = std::numeric_limits<uint32_t>::max(); 134 result = mojo::BeginWriteDataRaw( 135 handle_.get(), &data, &num_bytes, MOJO_WRITE_DATA_FLAG_NONE); 136 if (result != MOJO_RESULT_OK) { 137 DispatchFatalError(); 138 return; 139 } 140 pending_send_->GetData(static_cast<char*>(data), num_bytes); 141 } 142 143 void DataSourceSender::Done(uint32_t bytes_written) { 144 DoneInternal(bytes_written); 145 if (!shut_down_) 146 StartWaiting(); 147 } 148 149 void DataSourceSender::DoneWithError(uint32_t bytes_written, int32_t error) { 150 DoneInternal(bytes_written); 151 pending_send_.reset(); 152 if (!shut_down_) 153 client()->OnError(bytes_sent_, error); 154 // We don't call StartWaiting here so we don't send any additional data until 155 // Resume() is called. 156 } 157 158 void DataSourceSender::DoneInternal(uint32_t bytes_written) { 159 DCHECK(pending_send_); 160 if (shut_down_) 161 return; 162 163 bytes_sent_ += bytes_written; 164 MojoResult result = mojo::EndWriteDataRaw(handle_.get(), bytes_written); 165 if (result != MOJO_RESULT_OK) { 166 DispatchFatalError(); 167 return; 168 } 169 } 170 171 void DataSourceSender::DispatchFatalError() { 172 if (shut_down_) 173 return; 174 175 error_callback_.Run(); 176 ShutDown(); 177 } 178 179 DataSourceSender::PendingSend::PendingSend(DataSourceSender* sender, 180 const ReadyCallback& callback) 181 : sender_(sender), callback_(callback), buffer_in_use_(false) { 182 } 183 184 void DataSourceSender::PendingSend::GetData(void* data, uint32_t num_bytes) { 185 DCHECK(!buffer_in_use_); 186 buffer_in_use_ = true; 187 callback_.Run(scoped_ptr<WritableBuffer>( 188 new Buffer(sender_, this, static_cast<char*>(data), num_bytes))); 189 } 190 191 void DataSourceSender::PendingSend::Done(uint32_t bytes_written) { 192 DCHECK(buffer_in_use_); 193 buffer_in_use_ = false; 194 sender_->Done(bytes_written); 195 } 196 197 void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_written, 198 int32_t error) { 199 DCHECK(buffer_in_use_); 200 buffer_in_use_ = false; 201 sender_->DoneWithError(bytes_written, error); 202 } 203 204 DataSourceSender::PendingSend::Buffer::Buffer( 205 scoped_refptr<DataSourceSender> sender, 206 PendingSend* send, 207 char* buffer, 208 uint32_t buffer_size) 209 : sender_(sender), 210 pending_send_(send), 211 buffer_(buffer), 212 buffer_size_(buffer_size) { 213 } 214 215 DataSourceSender::PendingSend::Buffer::~Buffer() { 216 if (sender_.get()) 217 pending_send_->Done(0); 218 } 219 220 char* DataSourceSender::PendingSend::Buffer::GetData() { 221 return buffer_; 222 } 223 224 uint32_t DataSourceSender::PendingSend::Buffer::GetSize() { 225 return buffer_size_; 226 } 227 228 void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_written) { 229 DCHECK(sender_.get()); 230 pending_send_->Done(bytes_written); 231 sender_ = NULL; 232 pending_send_ = NULL; 233 buffer_ = NULL; 234 buffer_size_ = 0; 235 } 236 237 void DataSourceSender::PendingSend::Buffer::DoneWithError( 238 uint32_t bytes_written, 239 int32_t error) { 240 DCHECK(sender_.get()); 241 pending_send_->DoneWithError(bytes_written, error); 242 sender_ = NULL; 243 pending_send_ = NULL; 244 buffer_ = NULL; 245 buffer_size_ = 0; 246 } 247 248 } // namespace device 249