1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 define('data_sender', [ 6 'async_waiter', 7 'device/serial/data_stream.mojom', 8 'device/serial/data_stream_serialization.mojom', 9 'mojo/public/js/bindings/core', 10 'mojo/public/js/bindings/router', 11 ], function(asyncWaiter, dataStreamMojom, serialization, core, routerModule) { 12 /** 13 * @module data_sender 14 */ 15 16 /** 17 * A pending send operation. 18 * @param {!ArrayBuffer} data The data to be sent. 19 * @constructor 20 * @alias module:data_sender~PendingSend 21 * @private 22 */ 23 function PendingSend(data) { 24 /** 25 * The remaining data to be sent. 26 * @type {!ArrayBuffer} 27 * @private 28 */ 29 this.data_ = data; 30 /** 31 * The total length of data to be sent. 32 * @type {number} 33 * @private 34 */ 35 this.length_ = data.byteLength; 36 /** 37 * The number of bytes that have been received by the DataSink. 38 * @type {number} 39 * @private 40 */ 41 this.bytesReceivedBySink_ = 0; 42 /** 43 * The promise that will be resolved or rejected when this send completes 44 * or fails, respectively. 45 * @type {!Promise.<number>} 46 * @private 47 */ 48 this.promise_ = new Promise(function(resolve, reject) { 49 /** 50 * The callback to call on success. 51 * @type {Function} 52 * @private 53 */ 54 this.successCallback_ = resolve; 55 /** 56 * The callback to call with the error on failure. 57 * @type {Function} 58 * @private 59 */ 60 this.errorCallback_ = reject; 61 }.bind(this)); 62 } 63 64 /** 65 * Returns the promise that will be resolved when this operation completes or 66 * rejected if an error occurs. 67 * @return {!Promise.<number>} A promise to the number of bytes sent. 68 */ 69 PendingSend.prototype.getPromise = function() { 70 return this.promise_; 71 }; 72 73 /** 74 * @typedef module:data_sender~PendingSend.ReportBytesResult 75 * @property {number} bytesUnreported The number of bytes reported that were 76 * not part of the send. 77 * @property {boolean} done Whether this send has completed. 78 * @property {?number} bytesToFlush The number of bytes to flush in the event 79 * of an error. 80 */ 81 82 /** 83 * Invoked when the DataSink reports that bytes have been sent. Resolves the 84 * promise returned by 85 * [getPromise()]{@link module:data_sender~PendingSend#getPromise} once all 86 * bytes have been reported as sent. 87 * @param {number} numBytes The number of bytes sent. 88 * @return {!module:data_sender~PendingSend.ReportBytesResult} 89 */ 90 PendingSend.prototype.reportBytesSent = function(numBytes) { 91 var result = this.reportBytesSentInternal_(numBytes); 92 if (this.bytesReceivedBySink_ == this.length_) { 93 result.done = true; 94 this.successCallback_(this.bytesReceivedBySink_); 95 } 96 return result; 97 }; 98 99 /** 100 * Invoked when the DataSink reports an error. Rejects the promise returned by 101 * [getPromise()]{@link module:data_sender~PendingSend#getPromise} unless the 102 * error occurred after this send, that is, unless numBytes is greater than 103 * the nubmer of outstanding bytes. 104 * @param {number} numBytes The number of bytes sent. 105 * @param {number} error The error reported by the DataSink. 106 * @return {!module:data_sender~PendingSend.ReportBytesResult} 107 */ 108 PendingSend.prototype.reportBytesSentAndError = function(numBytes, error) { 109 var result = this.reportBytesSentInternal_(numBytes); 110 // If there are remaining bytes to report, the error occurred after this 111 // PendingSend so we should report success. 112 if (result.bytesUnreported > 0) { 113 this.successCallback_(this.bytesReceivedBySink_); 114 result.bytesToFlush = 0; 115 return result; 116 } 117 118 var e = new Error(); 119 e.error = error; 120 e.bytesSent = this.bytesReceivedBySink_; 121 this.errorCallback_(e); 122 this.done = true; 123 result.bytesToFlush = 124 this.length_ - this.data_.byteLength - this.bytesReceivedBySink_; 125 return result; 126 }; 127 128 /** 129 * Updates the internal state in response to a report from the DataSink. 130 * @param {number} numBytes The number of bytes sent. 131 * @return {!module:data_sender~PendingSend.ReportBytesResult} 132 * @private 133 */ 134 PendingSend.prototype.reportBytesSentInternal_ = function(numBytes) { 135 this.bytesReceivedBySink_ += numBytes; 136 var result = {bytesUnreported: 0}; 137 if (this.bytesReceivedBySink_ > this.length_) { 138 result.bytesUnreported = this.bytesReceivedBySink_ - this.length_; 139 this.bytesReceivedBySink_ = this.length_; 140 } 141 result.done = false; 142 return result; 143 }; 144 145 /** 146 * Writes pending data into the data pipe. 147 * @param {!MojoHandle} handle The handle to the data pipe. 148 * @return {number} The Mojo result corresponding to the outcome: 149 * <ul> 150 * <li>RESULT_OK if the write completes successfully; 151 * <li>RESULT_SHOULD_WAIT if some, but not all data was written; or 152 * <li>the data pipe error if the write failed. 153 * </ul> 154 */ 155 PendingSend.prototype.sendData = function(handle) { 156 var result = core.writeData( 157 handle, new Int8Array(this.data_), core.WRITE_DATA_FLAG_NONE); 158 if (result.result != core.RESULT_OK) 159 return result.result; 160 this.data_ = this.data_.slice(result.numBytes); 161 return this.data_.byteLength ? core.RESULT_SHOULD_WAIT : core.RESULT_OK; 162 }; 163 164 /** 165 * A DataSender that sends data to a DataSink. 166 * @param {!MojoHandle} handle The handle to the DataSink. 167 * @param {number} bufferSize How large a buffer the data pipe should use. 168 * @param {number} fatalErrorValue The send error value to report in the 169 * event of a fatal error. 170 * @constructor 171 * @alias module:data_sender.DataSender 172 */ 173 function DataSender(handle, bufferSize, fatalErrorValue) { 174 var dataPipeOptions = { 175 flags: core.CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 176 elementNumBytes: 1, 177 capacityNumBytes: bufferSize, 178 }; 179 var sendPipe = core.createDataPipe(dataPipeOptions); 180 this.init_(handle, sendPipe.producerHandle, fatalErrorValue); 181 this.sink_.init(sendPipe.consumerHandle); 182 } 183 184 DataSender.prototype = 185 $Object.create(dataStreamMojom.DataSinkClientStub.prototype); 186 187 /** 188 * Closes this DataSender. 189 */ 190 DataSender.prototype.close = function() { 191 if (this.shutDown_) 192 return; 193 this.shutDown_ = true; 194 this.waiter_.stop(); 195 this.router_.close(); 196 core.close(this.sendPipe_); 197 while (this.pendingSends_.length) { 198 this.pendingSends_.pop().reportBytesSentAndError( 199 0, this.fatalErrorValue_); 200 } 201 while (this.sendsAwaitingAck_.length) { 202 this.sendsAwaitingAck_.pop().reportBytesSentAndError( 203 0, this.fatalErrorValue_); 204 } 205 this.callCancelCallback_(); 206 }; 207 208 /** 209 * Initialize this DataSender. 210 * @param {!MojoHandle} sink A handle to the DataSink 211 * @param {!MojoHandle} dataPipe A handle to use for sending data to the 212 * DataSink. 213 * @param {number} fatalErrorValue The error to dispatch in the event of a 214 * fatal error. 215 * @private 216 */ 217 DataSender.prototype.init_ = function(sink, dataPipe, fatalErrorValue) { 218 /** 219 * The handle to the data pipe to use for sending data. 220 * @private 221 */ 222 this.sendPipe_ = dataPipe; 223 /** 224 * The error to be dispatched in the event of a fatal error. 225 * @const {number} 226 * @private 227 */ 228 this.fatalErrorValue_ = fatalErrorValue; 229 /** 230 * Whether this DataSender has shut down. 231 * @type {boolean} 232 * @private 233 */ 234 this.shutDown_ = false; 235 /** 236 * The [Router]{@link module:mojo/public/js/bindings/router.Router} for the 237 * connection to the DataSink. 238 * @private 239 */ 240 this.router_ = new routerModule.Router(sink); 241 /** 242 * The connection to the DataSink. 243 * @private 244 */ 245 this.sink_ = new dataStreamMojom.DataSinkProxy(this.router_); 246 this.router_.setIncomingReceiver(this); 247 /** 248 * The async waiter used to wait for 249 * {@link module:data_sender.DataSender#sendPipe_} to be writable. 250 * @type {!module:async_waiter.AsyncWaiter} 251 * @private 252 */ 253 this.waiter_ = new asyncWaiter.AsyncWaiter( 254 this.sendPipe_, core.HANDLE_SIGNAL_WRITABLE, 255 this.onHandleReady_.bind(this)); 256 /** 257 * A queue of sends that have not fully written their data to the data pipe. 258 * @type {!module:data_sender~PendingSend[]} 259 * @private 260 */ 261 this.pendingSends_ = []; 262 /** 263 * A queue of sends that have written their data to the data pipe, but have 264 * not been received by the DataSink. 265 * @type {!module:data_sender~PendingSend[]} 266 * @private 267 */ 268 this.sendsAwaitingAck_ = []; 269 270 /** 271 * The callback that will resolve a pending cancel if one is in progress. 272 * @type {?Function} 273 * @private 274 */ 275 this.pendingCancel_ = null; 276 277 /** 278 * The promise that will be resolved when a pending cancel completes if one 279 * is in progress. 280 * @type {Promise} 281 * @private 282 */ 283 this.cancelPromise_ = null; 284 }; 285 286 /** 287 * Serializes this DataSender. 288 * This will cancel any sends in progress before the returned promise 289 * resolves. 290 * @return {!Promise.<SerializedDataSender>} A promise that will resolve to 291 * the serialization of this DataSender. If this DataSender has shut down, 292 * the promise will resolve to null. 293 */ 294 DataSender.prototype.serialize = function() { 295 if (this.shutDown_) 296 return Promise.resolve(null); 297 298 var readyToSerialize = Promise.resolve(); 299 if (this.pendingSends_.length) { 300 if (this.pendingCancel_) 301 readyToSerialize = this.cancelPromise_; 302 else 303 readyToSerialize = this.cancel(this.fatalErrorValue_); 304 } 305 return readyToSerialize.then(function() { 306 this.waiter_.stop(); 307 var serialized = new serialization.SerializedDataSender(); 308 serialized.sink = this.router_.connector_.handle_, 309 serialized.data_pipe = this.sendPipe_, 310 serialized.fatal_error_value = this.fatalErrorValue_, 311 this.router_.connector_.handle_ = null; 312 this.router_.close(); 313 this.shutDown_ = true; 314 return serialized; 315 }.bind(this)); 316 }; 317 318 /** 319 * Deserializes a SerializedDataSender. 320 * @param {SerializedDataSender} serialized The serialized DataSender. 321 * @return {!DataSender} The deserialized DataSender. 322 */ 323 DataSender.deserialize = function(serialized) { 324 var sender = $Object.create(DataSender.prototype); 325 sender.deserialize_(serialized); 326 return sender; 327 }; 328 329 /** 330 * Deserializes a SerializedDataSender into this DataSender. 331 * @param {SerializedDataSender} serialized The serialized DataSender. 332 * @private 333 */ 334 DataSender.prototype.deserialize_ = function(serialized) { 335 if (!serialized) { 336 this.shutDown_ = true; 337 return; 338 } 339 this.init_( 340 serialized.sink, serialized.data_pipe, serialized.fatal_error_value); 341 }; 342 343 /** 344 * Sends data to the DataSink. 345 * @return {!Promise.<number>} A promise to the number of bytes sent. If an 346 * error occurs, the promise will reject with an Error object with a 347 * property error containing the error code. 348 * @throws Will throw if this has encountered a fatal error or a cancel is in 349 * progress. 350 */ 351 DataSender.prototype.send = function(data) { 352 if (this.shutDown_) 353 throw new Error('DataSender has been closed'); 354 if (this.pendingCancel_) 355 throw new Error('Cancel in progress'); 356 var send = new PendingSend(data); 357 this.pendingSends_.push(send); 358 if (!this.waiter_.isWaiting()) 359 this.waiter_.start(); 360 return send.getPromise(); 361 }; 362 363 /** 364 * Requests the cancellation of any in-progress sends. Calls to 365 * [send()]{@link module:data_sender.DataSender#send} will fail until the 366 * cancel has completed. 367 * @param {number} error The error to report for cancelled sends. 368 * @return {!Promise} A promise that will resolve when the cancel completes. 369 * @throws Will throw if this has encountered a fatal error or another cancel 370 * is in progress. 371 */ 372 DataSender.prototype.cancel = function(error) { 373 if (this.shutDown_) 374 throw new Error('DataSender has been closed'); 375 if (this.pendingCancel_) 376 throw new Error('Cancel already in progress'); 377 if (this.pendingSends_.length + this.sendsAwaitingAck_.length == 0) 378 return Promise.resolve(); 379 380 this.sink_.cancel(error); 381 this.cancelPromise_ = new Promise(function(resolve) { 382 this.pendingCancel_ = resolve; 383 }.bind(this)); 384 return this.cancelPromise_; 385 }; 386 387 /** 388 * Invoked when 389 * |[sendPipe_]{@link module:data_sender.DataSender#sendPipe_}| is ready to 390 * write. Writes to the data pipe if the wait is successful. 391 * @param {number} waitResult The result of the asynchronous wait. 392 * @private 393 */ 394 DataSender.prototype.onHandleReady_ = function(result) { 395 if (result != core.RESULT_OK) { 396 this.close(); 397 return; 398 } 399 while (this.pendingSends_.length) { 400 var result = this.pendingSends_[0].sendData(this.sendPipe_); 401 if (result == core.RESULT_OK) { 402 this.sendsAwaitingAck_.push(this.pendingSends_.shift()); 403 } else if (result == core.RESULT_SHOULD_WAIT) { 404 this.waiter_.start(); 405 return; 406 } else { 407 this.close(); 408 return; 409 } 410 } 411 }; 412 413 /** 414 * Calls and clears the pending cancel callback if one is pending. 415 * @private 416 */ 417 DataSender.prototype.callCancelCallback_ = function() { 418 if (this.pendingCancel_) { 419 this.cancelPromise_ = null; 420 this.pendingCancel_(); 421 this.pendingCancel_ = null; 422 } 423 }; 424 425 /** 426 * Invoked by the DataSink to report that data has been successfully sent. 427 * @param {number} numBytes The number of bytes sent. 428 * @private 429 */ 430 DataSender.prototype.reportBytesSent = function(numBytes) { 431 while (numBytes > 0 && this.sendsAwaitingAck_.length) { 432 var result = this.sendsAwaitingAck_[0].reportBytesSent(numBytes); 433 numBytes = result.bytesUnreported; 434 if (result.done) 435 this.sendsAwaitingAck_.shift(); 436 } 437 if (numBytes > 0 && this.pendingSends_.length) { 438 var result = this.pendingSends_[0].reportBytesSent(numBytes); 439 numBytes = result.bytesUnreported; 440 } 441 // A cancel is completed when all of the sends that were in progress have 442 // completed or failed. This is the case where all sends complete 443 // successfully. 444 if (this.pendingSends_.length + this.sendsAwaitingAck_.length == 0) 445 this.callCancelCallback_(); 446 }; 447 448 /** 449 * Invoked by the DataSink to report an error in sending data. 450 * @param {number} numBytes The number of bytes sent. 451 * @param {number} error The error reported by the DataSink. 452 * @private 453 */ 454 DataSender.prototype.reportBytesSentAndError = function(numBytes, error) { 455 var bytesToFlush = 0; 456 while (this.sendsAwaitingAck_.length) { 457 var result = this.sendsAwaitingAck_[0].reportBytesSentAndError( 458 numBytes, error); 459 numBytes = result.bytesUnreported; 460 this.sendsAwaitingAck_.shift(); 461 bytesToFlush += result.bytesToFlush; 462 } 463 while (this.pendingSends_.length) { 464 var result = this.pendingSends_[0].reportBytesSentAndError( 465 numBytes, error); 466 numBytes = result.bytesUnreported; 467 this.pendingSends_.shift(); 468 // Note: Only the first PendingSend in |pendingSends_| will have data to 469 // flush as only the first can have written data to the data pipe. 470 bytesToFlush += result.bytesToFlush; 471 } 472 this.callCancelCallback_(); 473 return Promise.resolve({bytes_to_flush: bytesToFlush}); 474 }; 475 476 return {DataSender: DataSender}; 477 }); 478