Home | History | Annotate | Download | only in base
      1 /*
      2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #include <iomanip>
     12 
     13 #include "webrtc/base/asyncsocket.h"
     14 #include "webrtc/base/logging.h"
     15 #include "webrtc/base/socketfactory.h"
     16 #include "webrtc/base/socketpool.h"
     17 #include "webrtc/base/socketstream.h"
     18 #include "webrtc/base/thread.h"
     19 
     20 namespace rtc {
     21 
     22 ///////////////////////////////////////////////////////////////////////////////
     23 // StreamCache - Caches a set of open streams, defers creation to a separate
     24 //  StreamPool.
     25 ///////////////////////////////////////////////////////////////////////////////
     26 
     27 StreamCache::StreamCache(StreamPool* pool) : pool_(pool) {
     28 }
     29 
     30 StreamCache::~StreamCache() {
     31   for (ConnectedList::iterator it = active_.begin(); it != active_.end();
     32        ++it) {
     33     delete it->second;
     34   }
     35   for (ConnectedList::iterator it = cached_.begin(); it != cached_.end();
     36        ++it) {
     37     delete it->second;
     38   }
     39 }
     40 
     41 StreamInterface* StreamCache::RequestConnectedStream(
     42     const SocketAddress& remote, int* err) {
     43   LOG_F(LS_VERBOSE) << "(" << remote << ")";
     44   for (ConnectedList::iterator it = cached_.begin(); it != cached_.end();
     45        ++it) {
     46     if (remote == it->first) {
     47       it->second->SignalEvent.disconnect(this);
     48       // Move from cached_ to active_
     49       active_.push_front(*it);
     50       cached_.erase(it);
     51       if (err)
     52         *err = 0;
     53       LOG_F(LS_VERBOSE) << "Providing cached stream";
     54       return active_.front().second;
     55     }
     56   }
     57   if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) {
     58     // We track active streams so that we can remember their address
     59     active_.push_front(ConnectedStream(remote, stream));
     60     LOG_F(LS_VERBOSE) << "Providing new stream";
     61     return active_.front().second;
     62   }
     63   return NULL;
     64 }
     65 
     66 void StreamCache::ReturnConnectedStream(StreamInterface* stream) {
     67   for (ConnectedList::iterator it = active_.begin(); it != active_.end();
     68        ++it) {
     69     if (stream == it->second) {
     70       LOG_F(LS_VERBOSE) << "(" << it->first << ")";
     71       if (stream->GetState() == SS_CLOSED) {
     72         // Return closed streams
     73         LOG_F(LS_VERBOSE) << "Returning closed stream";
     74         pool_->ReturnConnectedStream(it->second);
     75       } else {
     76         // Monitor open streams
     77         stream->SignalEvent.connect(this, &StreamCache::OnStreamEvent);
     78         LOG_F(LS_VERBOSE) << "Caching stream";
     79         cached_.push_front(*it);
     80       }
     81       active_.erase(it);
     82       return;
     83     }
     84   }
     85   ASSERT(false);
     86 }
     87 
     88 void StreamCache::OnStreamEvent(StreamInterface* stream, int events, int err) {
     89   if ((events & SE_CLOSE) == 0) {
     90     LOG_F(LS_WARNING) << "(" << events << ", " << err
     91                       << ") received non-close event";
     92     return;
     93   }
     94   for (ConnectedList::iterator it = cached_.begin(); it != cached_.end();
     95        ++it) {
     96     if (stream == it->second) {
     97       LOG_F(LS_VERBOSE) << "(" << it->first << ")";
     98       // We don't cache closed streams, so return it.
     99       it->second->SignalEvent.disconnect(this);
    100       LOG_F(LS_VERBOSE) << "Returning closed stream";
    101       pool_->ReturnConnectedStream(it->second);
    102       cached_.erase(it);
    103       return;
    104     }
    105   }
    106   ASSERT(false);
    107 }
    108 
    109 //////////////////////////////////////////////////////////////////////
    110 // NewSocketPool
    111 //////////////////////////////////////////////////////////////////////
    112 
    113 NewSocketPool::NewSocketPool(SocketFactory* factory) : factory_(factory) {
    114 }
    115 
    116 NewSocketPool::~NewSocketPool() {
    117 }
    118 
    119 StreamInterface*
    120 NewSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) {
    121   AsyncSocket* socket =
    122       factory_->CreateAsyncSocket(remote.family(), SOCK_STREAM);
    123   if (!socket) {
    124     if (err)
    125       *err = -1;
    126     return NULL;
    127   }
    128   if ((socket->Connect(remote) != 0) && !socket->IsBlocking()) {
    129     if (err)
    130       *err = socket->GetError();
    131     delete socket;
    132     return NULL;
    133   }
    134   if (err)
    135     *err = 0;
    136   return new SocketStream(socket);
    137 }
    138 
    139 void
    140 NewSocketPool::ReturnConnectedStream(StreamInterface* stream) {
    141   Thread::Current()->Dispose(stream);
    142 }
    143 
    144 //////////////////////////////////////////////////////////////////////
    145 // ReuseSocketPool
    146 //////////////////////////////////////////////////////////////////////
    147 
    148 ReuseSocketPool::ReuseSocketPool(SocketFactory* factory)
    149 : factory_(factory), stream_(NULL), checked_out_(false) {
    150 }
    151 
    152 ReuseSocketPool::~ReuseSocketPool() {
    153   ASSERT(!checked_out_);
    154   delete stream_;
    155 }
    156 
    157 StreamInterface*
    158 ReuseSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) {
    159   // Only one socket can be used from this "pool" at a time
    160   ASSERT(!checked_out_);
    161   if (!stream_) {
    162     LOG_F(LS_VERBOSE) << "Creating new socket";
    163     int family = remote.family();
    164     // TODO: Deal with this when we/I clean up DNS resolution.
    165     if (remote.IsUnresolvedIP()) {
    166       family = AF_INET;
    167     }
    168     AsyncSocket* socket =
    169         factory_->CreateAsyncSocket(family, SOCK_STREAM);
    170     if (!socket) {
    171       if (err)
    172         *err = -1;
    173       return NULL;
    174     }
    175     stream_ = new SocketStream(socket);
    176   }
    177   if ((stream_->GetState() == SS_OPEN) && (remote == remote_)) {
    178     LOG_F(LS_VERBOSE) << "Reusing connection to: " << remote_;
    179   } else {
    180     remote_ = remote;
    181     stream_->Close();
    182     if ((stream_->GetSocket()->Connect(remote_) != 0)
    183         && !stream_->GetSocket()->IsBlocking()) {
    184       if (err)
    185         *err = stream_->GetSocket()->GetError();
    186       return NULL;
    187     } else {
    188       LOG_F(LS_VERBOSE) << "Opening connection to: " << remote_;
    189     }
    190   }
    191   stream_->SignalEvent.disconnect(this);
    192   checked_out_ = true;
    193   if (err)
    194     *err = 0;
    195   return stream_;
    196 }
    197 
    198 void
    199 ReuseSocketPool::ReturnConnectedStream(StreamInterface* stream) {
    200   ASSERT(stream == stream_);
    201   ASSERT(checked_out_);
    202   checked_out_ = false;
    203   // Until the socket is reused, monitor it to determine if it closes.
    204   stream_->SignalEvent.connect(this, &ReuseSocketPool::OnStreamEvent);
    205 }
    206 
    207 void
    208 ReuseSocketPool::OnStreamEvent(StreamInterface* stream, int events, int err) {
    209   ASSERT(stream == stream_);
    210   ASSERT(!checked_out_);
    211 
    212   // If the stream was written to and then immediately returned to us then
    213   // we may get a writable notification for it, which we should ignore.
    214   if (events == SE_WRITE) {
    215     LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly writable: ignoring";
    216     return;
    217   }
    218 
    219   // If the peer sent data, we can't process it, so drop the connection.
    220   // If the socket has closed, clean it up.
    221   // In either case, we'll reconnect it the next time it is used.
    222   ASSERT(0 != (events & (SE_READ|SE_CLOSE)));
    223   if (0 != (events & SE_CLOSE)) {
    224     LOG_F(LS_VERBOSE) << "Connection closed with error: " << err;
    225   } else {
    226     LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly readable: closing";
    227   }
    228   stream_->Close();
    229 }
    230 
    231 ///////////////////////////////////////////////////////////////////////////////
    232 // LoggingPoolAdapter - Adapts a StreamPool to supply streams with attached
    233 // LoggingAdapters.
    234 ///////////////////////////////////////////////////////////////////////////////
    235 
    236 LoggingPoolAdapter::LoggingPoolAdapter(
    237     StreamPool* pool, LoggingSeverity level, const std::string& label,
    238     bool binary_mode)
    239   : pool_(pool), level_(level), label_(label), binary_mode_(binary_mode) {
    240 }
    241 
    242 LoggingPoolAdapter::~LoggingPoolAdapter() {
    243   for (StreamList::iterator it = recycle_bin_.begin();
    244        it != recycle_bin_.end(); ++it) {
    245     delete *it;
    246   }
    247 }
    248 
    249 StreamInterface* LoggingPoolAdapter::RequestConnectedStream(
    250     const SocketAddress& remote, int* err) {
    251   if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) {
    252     ASSERT(SS_CLOSED != stream->GetState());
    253     std::stringstream ss;
    254     ss << label_ << "(0x" << std::setfill('0') << std::hex << std::setw(8)
    255        << stream << ")";
    256     LOG_V(level_) << ss.str()
    257                   << ((SS_OPEN == stream->GetState()) ? " Connected"
    258                                                       : " Connecting")
    259                   << " to " << remote;
    260     if (recycle_bin_.empty()) {
    261       return new LoggingAdapter(stream, level_, ss.str(), binary_mode_);
    262     }
    263     LoggingAdapter* logging = recycle_bin_.front();
    264     recycle_bin_.pop_front();
    265     logging->set_label(ss.str());
    266     logging->Attach(stream);
    267     return logging;
    268   }
    269   return NULL;
    270 }
    271 
    272 void LoggingPoolAdapter::ReturnConnectedStream(StreamInterface* stream) {
    273   LoggingAdapter* logging = static_cast<LoggingAdapter*>(stream);
    274   pool_->ReturnConnectedStream(logging->Detach());
    275   recycle_bin_.push_back(logging);
    276 }
    277 
    278 ///////////////////////////////////////////////////////////////////////////////
    279 
    280 } // namespace rtc
    281