Home | History | Annotate | Download | only in serial
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "device/serial/data_source_sender.h"
      6 
      7 #include <limits>
      8 
      9 #include "base/bind.h"
     10 #include "base/message_loop/message_loop.h"
     11 #include "device/serial/async_waiter.h"
     12 
     13 namespace device {
     14 
     15 // Represents a send that is not yet fulfilled.
     16 class DataSourceSender::PendingSend {
     17  public:
     18   PendingSend(DataSourceSender* sender, const ReadyCallback& callback);
     19 
     20   // Asynchronously fills |data| with up to |num_bytes| of data. Following this,
     21   // one of Done() and DoneWithError() will be called with the result.
     22   void GetData(void* data, uint32_t num_bytes);
     23 
     24  private:
     25   class Buffer;
     26   // Reports a successful write of |bytes_written|.
     27   void Done(uint32_t bytes_written);
     28 
     29   // Reports a partially successful or unsuccessful write of |bytes_written|
     30   // with an error of |error|.
     31   void DoneWithError(uint32_t bytes_written, int32_t error);
     32 
     33   // The DataSourceSender that owns this.
     34   DataSourceSender* sender_;
     35 
     36   // The callback to call to get data.
     37   ReadyCallback callback_;
     38 
     39   // Whether the buffer specified by GetData() has been passed to |callback_|,
     40   // but has not yet called Done() or DoneWithError().
     41   bool buffer_in_use_;
     42 };
     43 
     44 // A Writable implementation that provides a view of a data pipe owned by a
     45 // DataSourceSender.
     46 class DataSourceSender::PendingSend::Buffer : public WritableBuffer {
     47  public:
     48   Buffer(scoped_refptr<DataSourceSender> sender,
     49          PendingSend* send,
     50          char* buffer,
     51          uint32_t buffer_size);
     52   virtual ~Buffer();
     53 
     54   // WritableBuffer overrides.
     55   virtual char* GetData() OVERRIDE;
     56   virtual uint32_t GetSize() OVERRIDE;
     57   virtual void Done(uint32_t bytes_written) OVERRIDE;
     58   virtual void DoneWithError(uint32_t bytes_written, int32_t error) OVERRIDE;
     59 
     60  private:
     61   // The DataSourceSender whose data pipe we are providing a view.
     62   scoped_refptr<DataSourceSender> sender_;
     63 
     64   // The PendingSend to which this buffer has been created in response.
     65   PendingSend* pending_send_;
     66 
     67   char* buffer_;
     68   uint32_t buffer_size_;
     69 };
     70 
     71 DataSourceSender::DataSourceSender(const ReadyCallback& ready_callback,
     72                                    const ErrorCallback& error_callback)
     73     : ready_callback_(ready_callback),
     74       error_callback_(error_callback),
     75       bytes_sent_(0),
     76       shut_down_(false) {
     77   DCHECK(!ready_callback.is_null() && !error_callback.is_null());
     78 }
     79 
     80 void DataSourceSender::ShutDown() {
     81   shut_down_ = true;
     82   waiter_.reset();
     83   ready_callback_.Reset();
     84   error_callback_.Reset();
     85 }
     86 
     87 DataSourceSender::~DataSourceSender() {
     88   DCHECK(shut_down_);
     89 }
     90 
     91 void DataSourceSender::Init(mojo::ScopedDataPipeProducerHandle handle) {
     92   // This should never occur. |handle_| is only valid and |pending_send_| is
     93   // only set after Init is called.
     94   if (pending_send_ || handle_.is_valid() || shut_down_) {
     95     DispatchFatalError();
     96     return;
     97   }
     98   handle_ = handle.Pass();
     99   pending_send_.reset(new PendingSend(this, ready_callback_));
    100   StartWaiting();
    101 }
    102 
    103 void DataSourceSender::Resume() {
    104   if (pending_send_ || !handle_.is_valid()) {
    105     DispatchFatalError();
    106     return;
    107   }
    108 
    109   pending_send_.reset(new PendingSend(this, ready_callback_));
    110   StartWaiting();
    111 }
    112 
    113 void DataSourceSender::OnConnectionError() {
    114   DispatchFatalError();
    115 }
    116 
    117 void DataSourceSender::StartWaiting() {
    118   DCHECK(pending_send_ && !waiter_);
    119   waiter_.reset(
    120       new AsyncWaiter(handle_.get(),
    121                       MOJO_HANDLE_SIGNAL_WRITABLE,
    122                       base::Bind(&DataSourceSender::OnDoneWaiting, this)));
    123 }
    124 
    125 void DataSourceSender::OnDoneWaiting(MojoResult result) {
    126   DCHECK(pending_send_ && !shut_down_ && waiter_);
    127   waiter_.reset();
    128   if (result != MOJO_RESULT_OK) {
    129     DispatchFatalError();
    130     return;
    131   }
    132   void* data = NULL;
    133   uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
    134   result = mojo::BeginWriteDataRaw(
    135       handle_.get(), &data, &num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
    136   if (result != MOJO_RESULT_OK) {
    137     DispatchFatalError();
    138     return;
    139   }
    140   pending_send_->GetData(static_cast<char*>(data), num_bytes);
    141 }
    142 
    143 void DataSourceSender::Done(uint32_t bytes_written) {
    144   DoneInternal(bytes_written);
    145   if (!shut_down_)
    146     StartWaiting();
    147 }
    148 
    149 void DataSourceSender::DoneWithError(uint32_t bytes_written, int32_t error) {
    150   DoneInternal(bytes_written);
    151   pending_send_.reset();
    152   if (!shut_down_)
    153     client()->OnError(bytes_sent_, error);
    154   // We don't call StartWaiting here so we don't send any additional data until
    155   // Resume() is called.
    156 }
    157 
    158 void DataSourceSender::DoneInternal(uint32_t bytes_written) {
    159   DCHECK(pending_send_);
    160   if (shut_down_)
    161     return;
    162 
    163   bytes_sent_ += bytes_written;
    164   MojoResult result = mojo::EndWriteDataRaw(handle_.get(), bytes_written);
    165   if (result != MOJO_RESULT_OK) {
    166     DispatchFatalError();
    167     return;
    168   }
    169 }
    170 
    171 void DataSourceSender::DispatchFatalError() {
    172   if (shut_down_)
    173     return;
    174 
    175   error_callback_.Run();
    176   ShutDown();
    177 }
    178 
    179 DataSourceSender::PendingSend::PendingSend(DataSourceSender* sender,
    180                                            const ReadyCallback& callback)
    181     : sender_(sender), callback_(callback), buffer_in_use_(false) {
    182 }
    183 
    184 void DataSourceSender::PendingSend::GetData(void* data, uint32_t num_bytes) {
    185   DCHECK(!buffer_in_use_);
    186   buffer_in_use_ = true;
    187   callback_.Run(scoped_ptr<WritableBuffer>(
    188       new Buffer(sender_, this, static_cast<char*>(data), num_bytes)));
    189 }
    190 
    191 void DataSourceSender::PendingSend::Done(uint32_t bytes_written) {
    192   DCHECK(buffer_in_use_);
    193   buffer_in_use_ = false;
    194   sender_->Done(bytes_written);
    195 }
    196 
    197 void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_written,
    198                                                   int32_t error) {
    199   DCHECK(buffer_in_use_);
    200   buffer_in_use_ = false;
    201   sender_->DoneWithError(bytes_written, error);
    202 }
    203 
    204 DataSourceSender::PendingSend::Buffer::Buffer(
    205     scoped_refptr<DataSourceSender> sender,
    206     PendingSend* send,
    207     char* buffer,
    208     uint32_t buffer_size)
    209     : sender_(sender),
    210       pending_send_(send),
    211       buffer_(buffer),
    212       buffer_size_(buffer_size) {
    213 }
    214 
    215 DataSourceSender::PendingSend::Buffer::~Buffer() {
    216   if (sender_.get())
    217     pending_send_->Done(0);
    218 }
    219 
    220 char* DataSourceSender::PendingSend::Buffer::GetData() {
    221   return buffer_;
    222 }
    223 
    224 uint32_t DataSourceSender::PendingSend::Buffer::GetSize() {
    225   return buffer_size_;
    226 }
    227 
    228 void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_written) {
    229   DCHECK(sender_.get());
    230   pending_send_->Done(bytes_written);
    231   sender_ = NULL;
    232   pending_send_ = NULL;
    233   buffer_ = NULL;
    234   buffer_size_ = 0;
    235 }
    236 
    237 void DataSourceSender::PendingSend::Buffer::DoneWithError(
    238     uint32_t bytes_written,
    239     int32_t error) {
    240   DCHECK(sender_.get());
    241   pending_send_->DoneWithError(bytes_written, error);
    242   sender_ = NULL;
    243   pending_send_ = NULL;
    244   buffer_ = NULL;
    245   buffer_size_ = 0;
    246 }
    247 
    248 }  // namespace device
    249