Home | History | Annotate | Download | only in serial
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "device/serial/data_receiver.h"
      6 
      7 #include <limits>
      8 
      9 #include "base/bind.h"
     10 #include "base/message_loop/message_loop.h"
     11 #include "device/serial/async_waiter.h"
     12 
     13 namespace device {
     14 
     15 // Represents a receive that is not yet fulfilled.
     16 class DataReceiver::PendingReceive {
     17  public:
     18   PendingReceive(DataReceiver* receiver,
     19                  const ReceiveDataCallback& callback,
     20                  const ReceiveErrorCallback& error_callback,
     21                  int32_t fatal_error_value);
     22 
     23   // Dispatches |data| to |receive_callback_|.
     24   void DispatchData(const void* data, uint32_t num_bytes);
     25 
     26   // Reports |error| to |receive_error_callback_| if it is an appropriate time.
     27   // Returns whether it dispatched |error|.
     28   bool DispatchError(DataReceiver::PendingError* error,
     29                      uint32_t bytes_received);
     30 
     31   // Reports |fatal_error_value_| to |receive_error_callback_|.
     32   void DispatchFatalError();
     33 
     34  private:
     35   class Buffer;
     36 
     37   // Invoked when the user is finished with the ReadOnlyBuffer provided to
     38   // |receive_callback_|.
     39   void Done(uint32_t num_bytes);
     40 
     41   // The DataReceiver that owns this.
     42   DataReceiver* receiver_;
     43 
     44   // The callback to dispatch data.
     45   ReceiveDataCallback receive_callback_;
     46 
     47   // The callback to report errors.
     48   ReceiveErrorCallback receive_error_callback_;
     49 
     50   // The error value to report when DispatchFatalError() is called.
     51   const int32_t fatal_error_value_;
     52 
     53   // True if the user owns a buffer passed to |receive_callback_| as part of
     54   // DispatchData().
     55   bool buffer_in_use_;
     56 };
     57 
     58 // A ReadOnlyBuffer implementation that provides a view of a data pipe owned by
     59 // a DataReceiver.
     60 class DataReceiver::PendingReceive::Buffer : public ReadOnlyBuffer {
     61  public:
     62   Buffer(scoped_refptr<DataReceiver> pipe,
     63          PendingReceive* receive,
     64          const char* buffer,
     65          uint32_t buffer_size);
     66   virtual ~Buffer();
     67 
     68   // ReadOnlyBuffer overrides.
     69   virtual const char* GetData() OVERRIDE;
     70   virtual uint32_t GetSize() OVERRIDE;
     71   virtual void Done(uint32_t bytes_consumed) OVERRIDE;
     72   virtual void DoneWithError(uint32_t bytes_consumed, int32_t error) OVERRIDE;
     73 
     74  private:
     75   // The DataReceiver whose data pipe we are providing a view.
     76   scoped_refptr<DataReceiver> receiver_;
     77 
     78   // The PendingReceive to which this buffer has been created in response.
     79   PendingReceive* pending_receive_;
     80 
     81   const char* buffer_;
     82   uint32_t buffer_size_;
     83 };
     84 
     85 // Represents an error received from the DataSource.
     86 struct DataReceiver::PendingError {
     87   PendingError(uint32_t offset, int32_t error)
     88       : offset(offset), error(error), dispatched(false) {}
     89 
     90   // The location within the data stream where the error occurred.
     91   const uint32_t offset;
     92 
     93   // The value of the error that occurred.
     94   const int32_t error;
     95 
     96   // Whether the error has been dispatched to the user.
     97   bool dispatched;
     98 };
     99 
    100 DataReceiver::DataReceiver(mojo::InterfacePtr<serial::DataSource> source,
    101                            uint32_t buffer_size,
    102                            int32_t fatal_error_value)
    103     : source_(source.Pass()),
    104       fatal_error_value_(fatal_error_value),
    105       bytes_received_(0),
    106       shut_down_(false),
    107       weak_factory_(this) {
    108   MojoCreateDataPipeOptions options = {
    109       sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size,
    110   };
    111   mojo::ScopedDataPipeProducerHandle remote_handle;
    112   MojoResult result = mojo::CreateDataPipe(&options, &remote_handle, &handle_);
    113   DCHECK_EQ(MOJO_RESULT_OK, result);
    114   source_->Init(remote_handle.Pass());
    115   source_.set_client(this);
    116 }
    117 
    118 bool DataReceiver::Receive(const ReceiveDataCallback& callback,
    119                            const ReceiveErrorCallback& error_callback) {
    120   DCHECK(!callback.is_null() && !error_callback.is_null());
    121   if (pending_receive_ || shut_down_)
    122     return false;
    123   // When the DataSource encounters an error, it pauses transmission. When the
    124   // user starts a new receive following notification of the error (via
    125   // |error_callback| of the previous Receive call) of the error we can tell the
    126   // DataSource to resume transmission of data.
    127   if (pending_error_ && pending_error_->dispatched) {
    128     source_->Resume();
    129     pending_error_.reset();
    130   }
    131 
    132   pending_receive_.reset(
    133       new PendingReceive(this, callback, error_callback, fatal_error_value_));
    134   base::MessageLoop::current()->PostTask(
    135       FROM_HERE,
    136       base::Bind(&DataReceiver::ReceiveInternal, weak_factory_.GetWeakPtr()));
    137   return true;
    138 }
    139 
    140 DataReceiver::~DataReceiver() {
    141   ShutDown();
    142 }
    143 
    144 void DataReceiver::OnError(uint32_t offset, int32_t error) {
    145   if (shut_down_)
    146     return;
    147 
    148   if (pending_error_) {
    149     // When OnError is called by the DataSource, transmission of data is
    150     // suspended. Thus we shouldn't receive another call to OnError until we
    151     // have fully dealt with the error and called Resume to resume transmission
    152     // (see Receive()). Under normal operation we should never get here, but if
    153     // we do (e.g. in the case of a hijacked service process) just shut down.
    154     ShutDown();
    155     return;
    156   }
    157   pending_error_.reset(new PendingError(offset, error));
    158   if (pending_receive_ &&
    159       pending_receive_->DispatchError(pending_error_.get(), bytes_received_)) {
    160     pending_receive_.reset();
    161     waiter_.reset();
    162   }
    163 }
    164 
    165 void DataReceiver::OnConnectionError() {
    166   ShutDown();
    167 }
    168 
    169 void DataReceiver::Done(uint32_t bytes_consumed) {
    170   if (shut_down_)
    171     return;
    172 
    173   DCHECK(pending_receive_);
    174   MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_consumed);
    175   DCHECK_EQ(MOJO_RESULT_OK, result);
    176   pending_receive_.reset();
    177   bytes_received_ += bytes_consumed;
    178 }
    179 
    180 void DataReceiver::OnDoneWaiting(MojoResult result) {
    181   DCHECK(pending_receive_ && !shut_down_ && waiter_);
    182   waiter_.reset();
    183   if (result != MOJO_RESULT_OK) {
    184     ShutDown();
    185     return;
    186   }
    187   ReceiveInternal();
    188 }
    189 
    190 void DataReceiver::ReceiveInternal() {
    191   if (shut_down_)
    192     return;
    193   DCHECK(pending_receive_);
    194   if (pending_error_ &&
    195       pending_receive_->DispatchError(pending_error_.get(), bytes_received_)) {
    196     pending_receive_.reset();
    197     waiter_.reset();
    198     return;
    199   }
    200 
    201   const void* data;
    202   uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
    203   MojoResult result = mojo::BeginReadDataRaw(
    204       handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE);
    205   if (result == MOJO_RESULT_OK) {
    206     if (!CheckErrorNotInReadRange(num_bytes)) {
    207       ShutDown();
    208       return;
    209     }
    210 
    211     pending_receive_->DispatchData(data, num_bytes);
    212     return;
    213   }
    214   if (result == MOJO_RESULT_SHOULD_WAIT) {
    215     waiter_.reset(new AsyncWaiter(
    216         handle_.get(),
    217         MOJO_HANDLE_SIGNAL_READABLE,
    218         base::Bind(&DataReceiver::OnDoneWaiting, weak_factory_.GetWeakPtr())));
    219     return;
    220   }
    221   ShutDown();
    222 }
    223 
    224 bool DataReceiver::CheckErrorNotInReadRange(uint32_t num_bytes) {
    225   DCHECK(pending_receive_);
    226   if (!pending_error_)
    227     return true;
    228 
    229   DCHECK_NE(bytes_received_, pending_error_->offset);
    230   DCHECK_NE(num_bytes, 0u);
    231   uint32_t potential_bytes_received = bytes_received_ + num_bytes;
    232   // bytes_received_ can overflow so we must consider two cases:
    233   //   1. Both |bytes_received_| and |pending_error_->offset| have overflowed an
    234   //      equal number of times. In this case, |potential_bytes_received| must
    235   //      be in the range (|bytes_received|, |pending_error_->offset|]. Below
    236   //      this range can only occur if |bytes_received_| overflows before
    237   //      |pending_error_->offset|. Above can only occur if |bytes_received_|
    238   //      overtakes |pending_error_->offset|.
    239   //   2. |pending_error_->offset| has overflowed once more than
    240   //      |bytes_received_|. In this case, |potential_bytes_received| must not
    241   //      be in the range (|pending_error_->offset|, |bytes_received_|].
    242   if ((bytes_received_ < pending_error_->offset &&
    243        (potential_bytes_received > pending_error_->offset ||
    244         potential_bytes_received <= bytes_received_)) ||
    245       (bytes_received_ > pending_error_->offset &&
    246        potential_bytes_received > pending_error_->offset &&
    247        potential_bytes_received <= bytes_received_)) {
    248     return false;
    249   }
    250   return true;
    251 }
    252 
    253 void DataReceiver::ShutDown() {
    254   shut_down_ = true;
    255   if (pending_receive_)
    256     pending_receive_->DispatchFatalError();
    257   pending_error_.reset();
    258   waiter_.reset();
    259 }
    260 
    261 DataReceiver::PendingReceive::PendingReceive(
    262     DataReceiver* receiver,
    263     const ReceiveDataCallback& callback,
    264     const ReceiveErrorCallback& error_callback,
    265     int32_t fatal_error_value)
    266     : receiver_(receiver),
    267       receive_callback_(callback),
    268       receive_error_callback_(error_callback),
    269       fatal_error_value_(fatal_error_value),
    270       buffer_in_use_(false) {
    271 }
    272 
    273 void DataReceiver::PendingReceive::DispatchData(const void* data,
    274                                                 uint32_t num_bytes) {
    275   DCHECK(!buffer_in_use_);
    276   buffer_in_use_ = true;
    277   receive_callback_.Run(scoped_ptr<ReadOnlyBuffer>(
    278       new Buffer(receiver_, this, static_cast<const char*>(data), num_bytes)));
    279 }
    280 
    281 bool DataReceiver::PendingReceive::DispatchError(PendingError* error,
    282                                                  uint32_t bytes_received) {
    283   DCHECK(!error->dispatched);
    284   if (buffer_in_use_ || bytes_received != error->offset)
    285     return false;
    286 
    287   error->dispatched = true;
    288   receive_error_callback_.Run(error->error);
    289   return true;
    290 }
    291 
    292 void DataReceiver::PendingReceive::DispatchFatalError() {
    293   receive_error_callback_.Run(fatal_error_value_);
    294 }
    295 
    296 void DataReceiver::PendingReceive::Done(uint32_t bytes_consumed) {
    297   DCHECK(buffer_in_use_);
    298   buffer_in_use_ = false;
    299   receiver_->Done(bytes_consumed);
    300 }
    301 
    302 DataReceiver::PendingReceive::Buffer::Buffer(
    303     scoped_refptr<DataReceiver> receiver,
    304     PendingReceive* receive,
    305     const char* buffer,
    306     uint32_t buffer_size)
    307     : receiver_(receiver),
    308       pending_receive_(receive),
    309       buffer_(buffer),
    310       buffer_size_(buffer_size) {
    311 }
    312 
    313 DataReceiver::PendingReceive::Buffer::~Buffer() {
    314   if (pending_receive_)
    315     pending_receive_->Done(0);
    316 }
    317 
    318 const char* DataReceiver::PendingReceive::Buffer::GetData() {
    319   return buffer_;
    320 }
    321 
    322 uint32_t DataReceiver::PendingReceive::Buffer::GetSize() {
    323   return buffer_size_;
    324 }
    325 
    326 void DataReceiver::PendingReceive::Buffer::Done(uint32_t bytes_consumed) {
    327   pending_receive_->Done(bytes_consumed);
    328   pending_receive_ = NULL;
    329   receiver_ = NULL;
    330   buffer_ = NULL;
    331   buffer_size_ = 0;
    332 }
    333 
    334 void DataReceiver::PendingReceive::Buffer::DoneWithError(
    335     uint32_t bytes_consumed,
    336     int32_t error) {
    337   Done(bytes_consumed);
    338 }
    339 
    340 }  // namespace device
    341