Home | History | Annotate | Download | only in network
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/services/public/cpp/network/web_socket_write_queue.h"
      6 
      7 #include "base/bind.h"
      8 
      9 namespace mojo {
     10 
     11 struct WebSocketWriteQueue::Operation {
     12   uint32_t num_bytes_;
     13   base::Callback<void(const char*)> callback_;
     14 
     15   const char* data_;
     16   // Only initialized if the initial Write fails. This saves a copy in
     17   // the common case.
     18   std::vector<char> data_copy_;
     19 };
     20 
     21 WebSocketWriteQueue::WebSocketWriteQueue(DataPipeProducerHandle handle)
     22     : handle_(handle), is_waiting_(false) {
     23 }
     24 
     25 WebSocketWriteQueue::~WebSocketWriteQueue() {
     26 }
     27 
     28 void WebSocketWriteQueue::Write(const char* data,
     29                                 uint32_t num_bytes,
     30                                 base::Callback<void(const char*)> callback) {
     31   Operation* op = new Operation;
     32   op->num_bytes_ = num_bytes;
     33   op->callback_ = callback;
     34   op->data_ = data;
     35   queue_.push_back(op);
     36 
     37   MojoResult result = MOJO_RESULT_SHOULD_WAIT;
     38   if (!is_waiting_)
     39     result = TryToWrite();
     40 
     41   // If we have to wait, make a local copy of the data so we know it will
     42   // live until we need it.
     43   if (result == MOJO_RESULT_SHOULD_WAIT) {
     44     op->data_copy_.resize(num_bytes);
     45     memcpy(&op->data_copy_[0], data, num_bytes);
     46     op->data_ = &op->data_copy_[0];
     47   }
     48 }
     49 
     50 MojoResult WebSocketWriteQueue::TryToWrite() {
     51   Operation* op = queue_[0];
     52   uint32_t bytes_written = op->num_bytes_;
     53   MojoResult result = WriteDataRaw(
     54       handle_, op->data_, &bytes_written, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE);
     55   if (result == MOJO_RESULT_SHOULD_WAIT) {
     56     Wait();
     57     return result;
     58   }
     59 
     60   // Ensure |op| is deleted, whether or not |this| goes away.
     61   scoped_ptr<Operation> op_deleter(op);
     62   queue_.weak_erase(queue_.begin());
     63   if (result != MOJO_RESULT_OK)
     64     return result;
     65 
     66   op->callback_.Run(op->data_);  // may delete |this|
     67   return result;
     68 }
     69 
     70 void WebSocketWriteQueue::Wait() {
     71   is_waiting_ = true;
     72   handle_watcher_.Start(handle_,
     73                         MOJO_HANDLE_SIGNAL_WRITABLE,
     74                         MOJO_DEADLINE_INDEFINITE,
     75                         base::Bind(&WebSocketWriteQueue::OnHandleReady,
     76                                    base::Unretained(this)));
     77 }
     78 
     79 void WebSocketWriteQueue::OnHandleReady(MojoResult result) {
     80   is_waiting_ = false;
     81   TryToWrite();
     82 }
     83 
     84 }  // namespace mojo
     85