1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "device/serial/data_receiver.h" 6 7 #include <limits> 8 9 #include "base/bind.h" 10 #include "base/message_loop/message_loop.h" 11 #include "device/serial/async_waiter.h" 12 13 namespace device { 14 15 // Represents a receive that is not yet fulfilled. 16 class DataReceiver::PendingReceive { 17 public: 18 PendingReceive(DataReceiver* receiver, 19 const ReceiveDataCallback& callback, 20 const ReceiveErrorCallback& error_callback, 21 int32_t fatal_error_value); 22 23 // Dispatches |data| to |receive_callback_|. 24 void DispatchData(const void* data, uint32_t num_bytes); 25 26 // Reports |error| to |receive_error_callback_| if it is an appropriate time. 27 // Returns whether it dispatched |error|. 28 bool DispatchError(DataReceiver::PendingError* error, 29 uint32_t bytes_received); 30 31 // Reports |fatal_error_value_| to |receive_error_callback_|. 32 void DispatchFatalError(); 33 34 private: 35 class Buffer; 36 37 // Invoked when the user is finished with the ReadOnlyBuffer provided to 38 // |receive_callback_|. 39 void Done(uint32_t num_bytes); 40 41 // The DataReceiver that owns this. 42 DataReceiver* receiver_; 43 44 // The callback to dispatch data. 45 ReceiveDataCallback receive_callback_; 46 47 // The callback to report errors. 48 ReceiveErrorCallback receive_error_callback_; 49 50 // The error value to report when DispatchFatalError() is called. 51 const int32_t fatal_error_value_; 52 53 // True if the user owns a buffer passed to |receive_callback_| as part of 54 // DispatchData(). 55 bool buffer_in_use_; 56 }; 57 58 // A ReadOnlyBuffer implementation that provides a view of a data pipe owned by 59 // a DataReceiver. 60 class DataReceiver::PendingReceive::Buffer : public ReadOnlyBuffer { 61 public: 62 Buffer(scoped_refptr<DataReceiver> pipe, 63 PendingReceive* receive, 64 const char* buffer, 65 uint32_t buffer_size); 66 virtual ~Buffer(); 67 68 // ReadOnlyBuffer overrides. 69 virtual const char* GetData() OVERRIDE; 70 virtual uint32_t GetSize() OVERRIDE; 71 virtual void Done(uint32_t bytes_consumed) OVERRIDE; 72 virtual void DoneWithError(uint32_t bytes_consumed, int32_t error) OVERRIDE; 73 74 private: 75 // The DataReceiver whose data pipe we are providing a view. 76 scoped_refptr<DataReceiver> receiver_; 77 78 // The PendingReceive to which this buffer has been created in response. 79 PendingReceive* pending_receive_; 80 81 const char* buffer_; 82 uint32_t buffer_size_; 83 }; 84 85 // Represents an error received from the DataSource. 86 struct DataReceiver::PendingError { 87 PendingError(uint32_t offset, int32_t error) 88 : offset(offset), error(error), dispatched(false) {} 89 90 // The location within the data stream where the error occurred. 91 const uint32_t offset; 92 93 // The value of the error that occurred. 94 const int32_t error; 95 96 // Whether the error has been dispatched to the user. 97 bool dispatched; 98 }; 99 100 DataReceiver::DataReceiver(mojo::InterfacePtr<serial::DataSource> source, 101 uint32_t buffer_size, 102 int32_t fatal_error_value) 103 : source_(source.Pass()), 104 fatal_error_value_(fatal_error_value), 105 bytes_received_(0), 106 shut_down_(false), 107 weak_factory_(this) { 108 MojoCreateDataPipeOptions options = { 109 sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size, 110 }; 111 mojo::ScopedDataPipeProducerHandle remote_handle; 112 MojoResult result = mojo::CreateDataPipe(&options, &remote_handle, &handle_); 113 DCHECK_EQ(MOJO_RESULT_OK, result); 114 source_->Init(remote_handle.Pass()); 115 source_.set_client(this); 116 } 117 118 bool DataReceiver::Receive(const ReceiveDataCallback& callback, 119 const ReceiveErrorCallback& error_callback) { 120 DCHECK(!callback.is_null() && !error_callback.is_null()); 121 if (pending_receive_ || shut_down_) 122 return false; 123 // When the DataSource encounters an error, it pauses transmission. When the 124 // user starts a new receive following notification of the error (via 125 // |error_callback| of the previous Receive call) of the error we can tell the 126 // DataSource to resume transmission of data. 127 if (pending_error_ && pending_error_->dispatched) { 128 source_->Resume(); 129 pending_error_.reset(); 130 } 131 132 pending_receive_.reset( 133 new PendingReceive(this, callback, error_callback, fatal_error_value_)); 134 base::MessageLoop::current()->PostTask( 135 FROM_HERE, 136 base::Bind(&DataReceiver::ReceiveInternal, weak_factory_.GetWeakPtr())); 137 return true; 138 } 139 140 DataReceiver::~DataReceiver() { 141 ShutDown(); 142 } 143 144 void DataReceiver::OnError(uint32_t offset, int32_t error) { 145 if (shut_down_) 146 return; 147 148 if (pending_error_) { 149 // When OnError is called by the DataSource, transmission of data is 150 // suspended. Thus we shouldn't receive another call to OnError until we 151 // have fully dealt with the error and called Resume to resume transmission 152 // (see Receive()). Under normal operation we should never get here, but if 153 // we do (e.g. in the case of a hijacked service process) just shut down. 154 ShutDown(); 155 return; 156 } 157 pending_error_.reset(new PendingError(offset, error)); 158 if (pending_receive_ && 159 pending_receive_->DispatchError(pending_error_.get(), bytes_received_)) { 160 pending_receive_.reset(); 161 waiter_.reset(); 162 } 163 } 164 165 void DataReceiver::OnConnectionError() { 166 ShutDown(); 167 } 168 169 void DataReceiver::Done(uint32_t bytes_consumed) { 170 if (shut_down_) 171 return; 172 173 DCHECK(pending_receive_); 174 MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_consumed); 175 DCHECK_EQ(MOJO_RESULT_OK, result); 176 pending_receive_.reset(); 177 bytes_received_ += bytes_consumed; 178 } 179 180 void DataReceiver::OnDoneWaiting(MojoResult result) { 181 DCHECK(pending_receive_ && !shut_down_ && waiter_); 182 waiter_.reset(); 183 if (result != MOJO_RESULT_OK) { 184 ShutDown(); 185 return; 186 } 187 ReceiveInternal(); 188 } 189 190 void DataReceiver::ReceiveInternal() { 191 if (shut_down_) 192 return; 193 DCHECK(pending_receive_); 194 if (pending_error_ && 195 pending_receive_->DispatchError(pending_error_.get(), bytes_received_)) { 196 pending_receive_.reset(); 197 waiter_.reset(); 198 return; 199 } 200 201 const void* data; 202 uint32_t num_bytes = std::numeric_limits<uint32_t>::max(); 203 MojoResult result = mojo::BeginReadDataRaw( 204 handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE); 205 if (result == MOJO_RESULT_OK) { 206 if (!CheckErrorNotInReadRange(num_bytes)) { 207 ShutDown(); 208 return; 209 } 210 211 pending_receive_->DispatchData(data, num_bytes); 212 return; 213 } 214 if (result == MOJO_RESULT_SHOULD_WAIT) { 215 waiter_.reset(new AsyncWaiter( 216 handle_.get(), 217 MOJO_HANDLE_SIGNAL_READABLE, 218 base::Bind(&DataReceiver::OnDoneWaiting, weak_factory_.GetWeakPtr()))); 219 return; 220 } 221 ShutDown(); 222 } 223 224 bool DataReceiver::CheckErrorNotInReadRange(uint32_t num_bytes) { 225 DCHECK(pending_receive_); 226 if (!pending_error_) 227 return true; 228 229 DCHECK_NE(bytes_received_, pending_error_->offset); 230 DCHECK_NE(num_bytes, 0u); 231 uint32_t potential_bytes_received = bytes_received_ + num_bytes; 232 // bytes_received_ can overflow so we must consider two cases: 233 // 1. Both |bytes_received_| and |pending_error_->offset| have overflowed an 234 // equal number of times. In this case, |potential_bytes_received| must 235 // be in the range (|bytes_received|, |pending_error_->offset|]. Below 236 // this range can only occur if |bytes_received_| overflows before 237 // |pending_error_->offset|. Above can only occur if |bytes_received_| 238 // overtakes |pending_error_->offset|. 239 // 2. |pending_error_->offset| has overflowed once more than 240 // |bytes_received_|. In this case, |potential_bytes_received| must not 241 // be in the range (|pending_error_->offset|, |bytes_received_|]. 242 if ((bytes_received_ < pending_error_->offset && 243 (potential_bytes_received > pending_error_->offset || 244 potential_bytes_received <= bytes_received_)) || 245 (bytes_received_ > pending_error_->offset && 246 potential_bytes_received > pending_error_->offset && 247 potential_bytes_received <= bytes_received_)) { 248 return false; 249 } 250 return true; 251 } 252 253 void DataReceiver::ShutDown() { 254 shut_down_ = true; 255 if (pending_receive_) 256 pending_receive_->DispatchFatalError(); 257 pending_error_.reset(); 258 waiter_.reset(); 259 } 260 261 DataReceiver::PendingReceive::PendingReceive( 262 DataReceiver* receiver, 263 const ReceiveDataCallback& callback, 264 const ReceiveErrorCallback& error_callback, 265 int32_t fatal_error_value) 266 : receiver_(receiver), 267 receive_callback_(callback), 268 receive_error_callback_(error_callback), 269 fatal_error_value_(fatal_error_value), 270 buffer_in_use_(false) { 271 } 272 273 void DataReceiver::PendingReceive::DispatchData(const void* data, 274 uint32_t num_bytes) { 275 DCHECK(!buffer_in_use_); 276 buffer_in_use_ = true; 277 receive_callback_.Run(scoped_ptr<ReadOnlyBuffer>( 278 new Buffer(receiver_, this, static_cast<const char*>(data), num_bytes))); 279 } 280 281 bool DataReceiver::PendingReceive::DispatchError(PendingError* error, 282 uint32_t bytes_received) { 283 DCHECK(!error->dispatched); 284 if (buffer_in_use_ || bytes_received != error->offset) 285 return false; 286 287 error->dispatched = true; 288 receive_error_callback_.Run(error->error); 289 return true; 290 } 291 292 void DataReceiver::PendingReceive::DispatchFatalError() { 293 receive_error_callback_.Run(fatal_error_value_); 294 } 295 296 void DataReceiver::PendingReceive::Done(uint32_t bytes_consumed) { 297 DCHECK(buffer_in_use_); 298 buffer_in_use_ = false; 299 receiver_->Done(bytes_consumed); 300 } 301 302 DataReceiver::PendingReceive::Buffer::Buffer( 303 scoped_refptr<DataReceiver> receiver, 304 PendingReceive* receive, 305 const char* buffer, 306 uint32_t buffer_size) 307 : receiver_(receiver), 308 pending_receive_(receive), 309 buffer_(buffer), 310 buffer_size_(buffer_size) { 311 } 312 313 DataReceiver::PendingReceive::Buffer::~Buffer() { 314 if (pending_receive_) 315 pending_receive_->Done(0); 316 } 317 318 const char* DataReceiver::PendingReceive::Buffer::GetData() { 319 return buffer_; 320 } 321 322 uint32_t DataReceiver::PendingReceive::Buffer::GetSize() { 323 return buffer_size_; 324 } 325 326 void DataReceiver::PendingReceive::Buffer::Done(uint32_t bytes_consumed) { 327 pending_receive_->Done(bytes_consumed); 328 pending_receive_ = NULL; 329 receiver_ = NULL; 330 buffer_ = NULL; 331 buffer_size_ = 0; 332 } 333 334 void DataReceiver::PendingReceive::Buffer::DoneWithError( 335 uint32_t bytes_consumed, 336 int32_t error) { 337 Done(bytes_consumed); 338 } 339 340 } // namespace device 341