Home | History | Annotate | Download | only in resources
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 define('data_sender', [
      6     'async_waiter',
      7     'device/serial/data_stream.mojom',
      8     'device/serial/data_stream_serialization.mojom',
      9     'mojo/public/js/bindings/core',
     10     'mojo/public/js/bindings/router',
     11 ], function(asyncWaiter, dataStreamMojom, serialization, core, routerModule) {
     12   /**
     13    * @module data_sender
     14    */
     15 
     16   /**
     17    * A pending send operation.
     18    * @param {!ArrayBuffer} data The data to be sent.
     19    * @constructor
     20    * @alias module:data_sender~PendingSend
     21    * @private
     22    */
     23   function PendingSend(data) {
     24     /**
     25      * The remaining data to be sent.
     26      * @type {!ArrayBuffer}
     27      * @private
     28      */
     29     this.data_ = data;
     30     /**
     31      * The total length of data to be sent.
     32      * @type {number}
     33      * @private
     34      */
     35     this.length_ = data.byteLength;
     36     /**
     37      * The number of bytes that have been received by the DataSink.
     38      * @type {number}
     39      * @private
     40      */
     41     this.bytesReceivedBySink_ = 0;
     42     /**
     43      * The promise that will be resolved or rejected when this send completes
     44      * or fails, respectively.
     45      * @type {!Promise.<number>}
     46      * @private
     47      */
     48     this.promise_ = new Promise(function(resolve, reject) {
     49       /**
     50        * The callback to call on success.
     51        * @type {Function}
     52        * @private
     53        */
     54       this.successCallback_ = resolve;
     55       /**
     56        * The callback to call with the error on failure.
     57        * @type {Function}
     58        * @private
     59        */
     60       this.errorCallback_ = reject;
     61     }.bind(this));
     62   }
     63 
     64   /**
     65    * Returns the promise that will be resolved when this operation completes or
     66    * rejected if an error occurs.
     67    * @return {!Promise.<number>} A promise to the number of bytes sent.
     68    */
     69   PendingSend.prototype.getPromise = function() {
     70     return this.promise_;
     71   };
     72 
     73   /**
     74    * @typedef module:data_sender~PendingSend.ReportBytesResult
     75    * @property {number} bytesUnreported The number of bytes reported that were
     76    *     not part of the send.
     77    * @property {boolean} done Whether this send has completed.
     78    * @property {?number} bytesToFlush The number of bytes to flush in the event
     79    *     of an error.
     80    */
     81 
     82   /**
     83    * Invoked when the DataSink reports that bytes have been sent. Resolves the
     84    * promise returned by
     85    * [getPromise()]{@link module:data_sender~PendingSend#getPromise} once all
     86    * bytes have been reported as sent.
     87    * @param {number} numBytes The number of bytes sent.
     88    * @return {!module:data_sender~PendingSend.ReportBytesResult}
     89    */
     90   PendingSend.prototype.reportBytesSent = function(numBytes) {
     91     var result = this.reportBytesSentInternal_(numBytes);
     92     if (this.bytesReceivedBySink_ == this.length_) {
     93       result.done = true;
     94       this.successCallback_(this.bytesReceivedBySink_);
     95     }
     96     return result;
     97   };
     98 
     99   /**
    100    * Invoked when the DataSink reports an error. Rejects the promise returned by
    101    * [getPromise()]{@link module:data_sender~PendingSend#getPromise} unless the
    102    * error occurred after this send, that is, unless numBytes is greater than
    103    * the nubmer of outstanding bytes.
    104    * @param {number} numBytes The number of bytes sent.
    105    * @param {number} error The error reported by the DataSink.
    106    * @return {!module:data_sender~PendingSend.ReportBytesResult}
    107    */
    108   PendingSend.prototype.reportBytesSentAndError = function(numBytes, error) {
    109     var result = this.reportBytesSentInternal_(numBytes);
    110     // If there are remaining bytes to report, the error occurred after this
    111     // PendingSend so we should report success.
    112     if (result.bytesUnreported > 0) {
    113       this.successCallback_(this.bytesReceivedBySink_);
    114       result.bytesToFlush = 0;
    115       return result;
    116     }
    117 
    118     var e = new Error();
    119     e.error = error;
    120     e.bytesSent = this.bytesReceivedBySink_;
    121     this.errorCallback_(e);
    122     this.done = true;
    123     result.bytesToFlush =
    124         this.length_ - this.data_.byteLength - this.bytesReceivedBySink_;
    125     return result;
    126   };
    127 
    128   /**
    129    * Updates the internal state in response to a report from the DataSink.
    130    * @param {number} numBytes The number of bytes sent.
    131    * @return {!module:data_sender~PendingSend.ReportBytesResult}
    132    * @private
    133    */
    134   PendingSend.prototype.reportBytesSentInternal_ = function(numBytes) {
    135     this.bytesReceivedBySink_ += numBytes;
    136     var result = {bytesUnreported: 0};
    137     if (this.bytesReceivedBySink_ > this.length_) {
    138       result.bytesUnreported = this.bytesReceivedBySink_ - this.length_;
    139       this.bytesReceivedBySink_ = this.length_;
    140     }
    141     result.done = false;
    142     return result;
    143   };
    144 
    145   /**
    146    * Writes pending data into the data pipe.
    147    * @param {!MojoHandle} handle The handle to the data pipe.
    148    * @return {number} The Mojo result corresponding to the outcome:
    149    *     <ul>
    150    *     <li>RESULT_OK if the write completes successfully;
    151    *     <li>RESULT_SHOULD_WAIT if some, but not all data was written; or
    152    *     <li>the data pipe error if the write failed.
    153    *     </ul>
    154    */
    155   PendingSend.prototype.sendData = function(handle) {
    156     var result = core.writeData(
    157         handle, new Int8Array(this.data_), core.WRITE_DATA_FLAG_NONE);
    158     if (result.result != core.RESULT_OK)
    159       return result.result;
    160     this.data_ = this.data_.slice(result.numBytes);
    161     return this.data_.byteLength ? core.RESULT_SHOULD_WAIT : core.RESULT_OK;
    162   };
    163 
    164   /**
    165    * A DataSender that sends data to a DataSink.
    166    * @param {!MojoHandle} handle The handle to the DataSink.
    167    * @param {number} bufferSize How large a buffer the data pipe should use.
    168    * @param {number} fatalErrorValue The send error value to report in the
    169    *     event of a fatal error.
    170    * @constructor
    171    * @alias module:data_sender.DataSender
    172    */
    173   function DataSender(handle, bufferSize, fatalErrorValue) {
    174     var dataPipeOptions = {
    175       flags: core.CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,
    176       elementNumBytes: 1,
    177       capacityNumBytes: bufferSize,
    178     };
    179     var sendPipe = core.createDataPipe(dataPipeOptions);
    180     this.init_(handle, sendPipe.producerHandle, fatalErrorValue);
    181     this.sink_.init(sendPipe.consumerHandle);
    182   }
    183 
    184   DataSender.prototype =
    185       $Object.create(dataStreamMojom.DataSinkClientStub.prototype);
    186 
    187   /**
    188    * Closes this DataSender.
    189    */
    190   DataSender.prototype.close = function() {
    191     if (this.shutDown_)
    192       return;
    193     this.shutDown_ = true;
    194     this.waiter_.stop();
    195     this.router_.close();
    196     core.close(this.sendPipe_);
    197     while (this.pendingSends_.length) {
    198       this.pendingSends_.pop().reportBytesSentAndError(
    199           0, this.fatalErrorValue_);
    200     }
    201     while (this.sendsAwaitingAck_.length) {
    202       this.sendsAwaitingAck_.pop().reportBytesSentAndError(
    203           0, this.fatalErrorValue_);
    204     }
    205     this.callCancelCallback_();
    206   };
    207 
    208   /**
    209    * Initialize this DataSender.
    210    * @param {!MojoHandle} sink A handle to the DataSink
    211    * @param {!MojoHandle} dataPipe A handle to use for sending data to the
    212    *     DataSink.
    213    * @param {number} fatalErrorValue The error to dispatch in the event of a
    214    *     fatal error.
    215    * @private
    216    */
    217   DataSender.prototype.init_ = function(sink, dataPipe, fatalErrorValue) {
    218     /**
    219      * The handle to the data pipe to use for sending data.
    220      * @private
    221      */
    222     this.sendPipe_ = dataPipe;
    223     /**
    224      * The error to be dispatched in the event of a fatal error.
    225      * @const {number}
    226      * @private
    227      */
    228     this.fatalErrorValue_ = fatalErrorValue;
    229     /**
    230      * Whether this DataSender has shut down.
    231      * @type {boolean}
    232      * @private
    233      */
    234     this.shutDown_ = false;
    235     /**
    236      * The [Router]{@link module:mojo/public/js/bindings/router.Router} for the
    237      * connection to the DataSink.
    238      * @private
    239      */
    240     this.router_ = new routerModule.Router(sink);
    241     /**
    242      * The connection to the DataSink.
    243      * @private
    244      */
    245     this.sink_ = new dataStreamMojom.DataSinkProxy(this.router_);
    246     this.router_.setIncomingReceiver(this);
    247     /**
    248      * The async waiter used to wait for
    249      * {@link module:data_sender.DataSender#sendPipe_} to be writable.
    250      * @type {!module:async_waiter.AsyncWaiter}
    251      * @private
    252      */
    253     this.waiter_ = new asyncWaiter.AsyncWaiter(
    254         this.sendPipe_, core.HANDLE_SIGNAL_WRITABLE,
    255         this.onHandleReady_.bind(this));
    256     /**
    257      * A queue of sends that have not fully written their data to the data pipe.
    258      * @type {!module:data_sender~PendingSend[]}
    259      * @private
    260      */
    261     this.pendingSends_ = [];
    262     /**
    263      * A queue of sends that have written their data to the data pipe, but have
    264      * not been received by the DataSink.
    265      * @type {!module:data_sender~PendingSend[]}
    266      * @private
    267      */
    268     this.sendsAwaitingAck_ = [];
    269 
    270     /**
    271      * The callback that will resolve a pending cancel if one is in progress.
    272      * @type {?Function}
    273      * @private
    274      */
    275     this.pendingCancel_ = null;
    276 
    277     /**
    278      * The promise that will be resolved when a pending cancel completes if one
    279      * is in progress.
    280      * @type {Promise}
    281      * @private
    282      */
    283     this.cancelPromise_ = null;
    284   };
    285 
    286   /**
    287    * Serializes this DataSender.
    288    * This will cancel any sends in progress before the returned promise
    289    * resolves.
    290    * @return {!Promise.<SerializedDataSender>} A promise that will resolve to
    291    *     the serialization of this DataSender. If this DataSender has shut down,
    292    *     the promise will resolve to null.
    293    */
    294   DataSender.prototype.serialize = function() {
    295     if (this.shutDown_)
    296       return Promise.resolve(null);
    297 
    298     var readyToSerialize = Promise.resolve();
    299     if (this.pendingSends_.length) {
    300       if (this.pendingCancel_)
    301         readyToSerialize = this.cancelPromise_;
    302       else
    303         readyToSerialize = this.cancel(this.fatalErrorValue_);
    304     }
    305     return readyToSerialize.then(function() {
    306       this.waiter_.stop();
    307       var serialized = new serialization.SerializedDataSender();
    308       serialized.sink = this.router_.connector_.handle_,
    309       serialized.data_pipe = this.sendPipe_,
    310       serialized.fatal_error_value = this.fatalErrorValue_,
    311       this.router_.connector_.handle_ = null;
    312       this.router_.close();
    313       this.shutDown_ = true;
    314       return serialized;
    315     }.bind(this));
    316   };
    317 
    318   /**
    319    * Deserializes a SerializedDataSender.
    320    * @param {SerializedDataSender} serialized The serialized DataSender.
    321    * @return {!DataSender} The deserialized DataSender.
    322    */
    323   DataSender.deserialize = function(serialized) {
    324     var sender = $Object.create(DataSender.prototype);
    325     sender.deserialize_(serialized);
    326     return sender;
    327   };
    328 
    329   /**
    330    * Deserializes a SerializedDataSender into this DataSender.
    331    * @param {SerializedDataSender} serialized The serialized DataSender.
    332    * @private
    333    */
    334   DataSender.prototype.deserialize_ = function(serialized) {
    335     if (!serialized) {
    336       this.shutDown_ = true;
    337       return;
    338     }
    339     this.init_(
    340         serialized.sink, serialized.data_pipe, serialized.fatal_error_value);
    341   };
    342 
    343   /**
    344    * Sends data to the DataSink.
    345    * @return {!Promise.<number>} A promise to the number of bytes sent. If an
    346    *     error occurs, the promise will reject with an Error object with a
    347    *     property error containing the error code.
    348    * @throws Will throw if this has encountered a fatal error or a cancel is in
    349    *     progress.
    350    */
    351   DataSender.prototype.send = function(data) {
    352     if (this.shutDown_)
    353       throw new Error('DataSender has been closed');
    354     if (this.pendingCancel_)
    355       throw new Error('Cancel in progress');
    356     var send = new PendingSend(data);
    357     this.pendingSends_.push(send);
    358     if (!this.waiter_.isWaiting())
    359       this.waiter_.start();
    360     return send.getPromise();
    361   };
    362 
    363   /**
    364    * Requests the cancellation of any in-progress sends. Calls to
    365    * [send()]{@link module:data_sender.DataSender#send} will fail until the
    366    * cancel has completed.
    367    * @param {number} error The error to report for cancelled sends.
    368    * @return {!Promise} A promise that will resolve when the cancel completes.
    369    * @throws Will throw if this has encountered a fatal error or another cancel
    370    *     is in progress.
    371    */
    372   DataSender.prototype.cancel = function(error) {
    373     if (this.shutDown_)
    374       throw new Error('DataSender has been closed');
    375     if (this.pendingCancel_)
    376       throw new Error('Cancel already in progress');
    377     if (this.pendingSends_.length + this.sendsAwaitingAck_.length == 0)
    378       return Promise.resolve();
    379 
    380     this.sink_.cancel(error);
    381     this.cancelPromise_ = new Promise(function(resolve) {
    382       this.pendingCancel_ = resolve;
    383     }.bind(this));
    384     return this.cancelPromise_;
    385   };
    386 
    387   /**
    388    * Invoked when
    389    * |[sendPipe_]{@link module:data_sender.DataSender#sendPipe_}| is ready to
    390    * write. Writes to the data pipe if the wait is successful.
    391    * @param {number} waitResult The result of the asynchronous wait.
    392    * @private
    393    */
    394   DataSender.prototype.onHandleReady_ = function(result) {
    395     if (result != core.RESULT_OK) {
    396       this.close();
    397       return;
    398     }
    399     while (this.pendingSends_.length) {
    400       var result = this.pendingSends_[0].sendData(this.sendPipe_);
    401       if (result == core.RESULT_OK) {
    402         this.sendsAwaitingAck_.push(this.pendingSends_.shift());
    403       } else if (result == core.RESULT_SHOULD_WAIT) {
    404         this.waiter_.start();
    405         return;
    406       } else {
    407         this.close();
    408         return;
    409       }
    410     }
    411   };
    412 
    413   /**
    414    * Calls and clears the pending cancel callback if one is pending.
    415    * @private
    416    */
    417   DataSender.prototype.callCancelCallback_ = function() {
    418     if (this.pendingCancel_) {
    419       this.cancelPromise_ = null;
    420       this.pendingCancel_();
    421       this.pendingCancel_ = null;
    422     }
    423   };
    424 
    425   /**
    426    * Invoked by the DataSink to report that data has been successfully sent.
    427    * @param {number} numBytes The number of bytes sent.
    428    * @private
    429    */
    430   DataSender.prototype.reportBytesSent = function(numBytes) {
    431     while (numBytes > 0 && this.sendsAwaitingAck_.length) {
    432       var result = this.sendsAwaitingAck_[0].reportBytesSent(numBytes);
    433       numBytes = result.bytesUnreported;
    434       if (result.done)
    435         this.sendsAwaitingAck_.shift();
    436     }
    437     if (numBytes > 0 && this.pendingSends_.length) {
    438       var result = this.pendingSends_[0].reportBytesSent(numBytes);
    439       numBytes = result.bytesUnreported;
    440     }
    441     // A cancel is completed when all of the sends that were in progress have
    442     // completed or failed. This is the case where all sends complete
    443     // successfully.
    444     if (this.pendingSends_.length + this.sendsAwaitingAck_.length == 0)
    445       this.callCancelCallback_();
    446   };
    447 
    448   /**
    449    * Invoked by the DataSink to report an error in sending data.
    450    * @param {number} numBytes The number of bytes sent.
    451    * @param {number} error The error reported by the DataSink.
    452    * @private
    453    */
    454   DataSender.prototype.reportBytesSentAndError = function(numBytes, error) {
    455     var bytesToFlush = 0;
    456     while (this.sendsAwaitingAck_.length) {
    457       var result = this.sendsAwaitingAck_[0].reportBytesSentAndError(
    458           numBytes, error);
    459       numBytes = result.bytesUnreported;
    460       this.sendsAwaitingAck_.shift();
    461       bytesToFlush += result.bytesToFlush;
    462     }
    463     while (this.pendingSends_.length) {
    464       var result = this.pendingSends_[0].reportBytesSentAndError(
    465           numBytes, error);
    466       numBytes = result.bytesUnreported;
    467       this.pendingSends_.shift();
    468       // Note: Only the first PendingSend in |pendingSends_| will have data to
    469       // flush as only the first can have written data to the data pipe.
    470       bytesToFlush += result.bytesToFlush;
    471     }
    472     this.callCancelCallback_();
    473     return Promise.resolve({bytes_to_flush: bytesToFlush});
    474   };
    475 
    476   return {DataSender: DataSender};
    477 });
    478