Home | History | Annotate | Download | only in protocol
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #ifndef REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_
      6 #define REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_
      7 
      8 #include <list>
      9 
     10 #include "base/callback.h"
     11 #include "base/synchronization/lock.h"
     12 #include "base/threading/non_thread_safe.h"
     13 #include "net/base/io_buffer.h"
     14 #include "net/socket/socket.h"
     15 
     16 namespace net {
     17 class Socket;
     18 }  // namespace net
     19 
     20 namespace remoting {
     21 namespace protocol {
     22 
     23 // BufferedSocketWriter and BufferedDatagramWriter implement write data queue
     24 // for stream and datagram sockets. BufferedSocketWriterBase is a base class
     25 // that implements base functionality common for streams and datagrams.
     26 // These classes are particularly useful when data comes from a thread
     27 // that doesn't own the socket, as Write() can be called from any thread.
     28 // Whenever new data is written it is just put in the queue, and then written
     29 // on the thread that owns the socket. GetBufferChunks() and GetBufferSize()
     30 // can be used to throttle writes.
     31 
     32 class BufferedSocketWriterBase : public base::NonThreadSafe {
     33  public:
     34   typedef base::Callback<void(int)> WriteFailedCallback;
     35 
     36   BufferedSocketWriterBase();
     37   virtual ~BufferedSocketWriterBase();
     38 
     39   // Initializes the writer. Must be called on the thread that will be used
     40   // to access the socket in the future. |callback| will be called after each
     41   // failed write. Caller retains ownership of |socket|.
     42   // TODO(sergeyu): Change it so that it take ownership of |socket|.
     43   void Init(net::Socket* socket, const WriteFailedCallback& callback);
     44 
     45   // Puts a new data chunk in the buffer. Returns false and doesn't enqueue
     46   // the data if called before Init(). Can be called on any thread.
     47   bool Write(scoped_refptr<net::IOBufferWithSize> buffer,
     48              const base::Closure& done_task);
     49 
     50   // Returns current size of the buffer. Can be called on any thread.
     51   int GetBufferSize();
     52 
     53   // Returns number of chunks that are currently in the buffer waiting
     54   // to be written. Can be called on any thread.
     55   int GetBufferChunks();
     56 
     57   // Stops writing and drops current buffers. Must be called on the
     58   // network thread.
     59   void Close();
     60 
     61  protected:
     62   struct PendingPacket;
     63   typedef std::list<PendingPacket*> DataQueue;
     64 
     65   DataQueue queue_;
     66   int buffer_size_;
     67 
     68   // Removes element from the front of the queue and returns |done_task| for
     69   // that element. Called from AdvanceBufferPosition() implementation, which
     70   // then returns result of this function to its caller.
     71   base::Closure PopQueue();
     72 
     73   // Following three methods must be implemented in child classes.
     74 
     75   // Returns next packet that needs to be written to the socket. Implementation
     76   // must set |*buffer| to NULL if there is nothing left in the queue.
     77   virtual void GetNextPacket(net::IOBuffer** buffer, int* size) = 0;
     78 
     79   // Returns closure that must be executed or null closure if the last write
     80   // didn't complete any messages.
     81   virtual base::Closure AdvanceBufferPosition(int written) = 0;
     82 
     83   // This method is called whenever there is an error writing to the socket.
     84   virtual void OnError(int result) = 0;
     85 
     86  private:
     87   void DoWrite();
     88   void HandleWriteResult(int result, bool* write_again);
     89   void OnWritten(int result);
     90 
     91   // This method is called when an error is encountered.
     92   void HandleError(int result);
     93 
     94   net::Socket* socket_;
     95   WriteFailedCallback write_failed_callback_;
     96 
     97   bool write_pending_;
     98 
     99   bool closed_;
    100 
    101   bool* destroyed_flag_;
    102 };
    103 
    104 class BufferedSocketWriter : public BufferedSocketWriterBase {
    105  public:
    106   BufferedSocketWriter();
    107   virtual ~BufferedSocketWriter();
    108 
    109  protected:
    110   virtual void GetNextPacket(net::IOBuffer** buffer, int* size) OVERRIDE;
    111   virtual base::Closure AdvanceBufferPosition(int written) OVERRIDE;
    112   virtual void OnError(int result) OVERRIDE;
    113 
    114  private:
    115   scoped_refptr<net::DrainableIOBuffer> current_buf_;
    116 };
    117 
    118 class BufferedDatagramWriter : public BufferedSocketWriterBase {
    119  public:
    120   BufferedDatagramWriter();
    121   virtual ~BufferedDatagramWriter();
    122 
    123  protected:
    124   virtual void GetNextPacket(net::IOBuffer** buffer, int* size) OVERRIDE;
    125   virtual base::Closure AdvanceBufferPosition(int written) OVERRIDE;
    126   virtual void OnError(int result) OVERRIDE;
    127 };
    128 
    129 }  // namespace protocol
    130 }  // namespace remoting
    131 
    132 #endif  // REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_
    133