1 /* 2 * libjingle 3 * Copyright 2004--2005, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include <iomanip> 29 30 #include "talk/base/asyncsocket.h" 31 #include "talk/base/logging.h" 32 #include "talk/base/socketfactory.h" 33 #include "talk/base/socketpool.h" 34 #include "talk/base/socketstream.h" 35 #include "talk/base/thread.h" 36 37 namespace talk_base { 38 39 /////////////////////////////////////////////////////////////////////////////// 40 // StreamCache - Caches a set of open streams, defers creation to a separate 41 // StreamPool. 42 /////////////////////////////////////////////////////////////////////////////// 43 44 StreamCache::StreamCache(StreamPool* pool) : pool_(pool) { 45 } 46 47 StreamCache::~StreamCache() { 48 for (ConnectedList::iterator it = active_.begin(); it != active_.end(); 49 ++it) { 50 delete it->second; 51 } 52 for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); 53 ++it) { 54 delete it->second; 55 } 56 } 57 58 StreamInterface* StreamCache::RequestConnectedStream( 59 const SocketAddress& remote, int* err) { 60 LOG_F(LS_VERBOSE) << "(" << remote << ")"; 61 for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); 62 ++it) { 63 if (remote == it->first) { 64 it->second->SignalEvent.disconnect(this); 65 // Move from cached_ to active_ 66 active_.push_front(*it); 67 cached_.erase(it); 68 if (err) 69 *err = 0; 70 LOG_F(LS_VERBOSE) << "Providing cached stream"; 71 return active_.front().second; 72 } 73 } 74 if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) { 75 // We track active streams so that we can remember their address 76 active_.push_front(ConnectedStream(remote, stream)); 77 LOG_F(LS_VERBOSE) << "Providing new stream"; 78 return active_.front().second; 79 } 80 return NULL; 81 } 82 83 void StreamCache::ReturnConnectedStream(StreamInterface* stream) { 84 for (ConnectedList::iterator it = active_.begin(); it != active_.end(); 85 ++it) { 86 if (stream == it->second) { 87 LOG_F(LS_VERBOSE) << "(" << it->first << ")"; 88 if (stream->GetState() == SS_CLOSED) { 89 // Return closed streams 90 LOG_F(LS_VERBOSE) << "Returning closed stream"; 91 pool_->ReturnConnectedStream(it->second); 92 } else { 93 // Monitor open streams 94 stream->SignalEvent.connect(this, &StreamCache::OnStreamEvent); 95 LOG_F(LS_VERBOSE) << "Caching stream"; 96 cached_.push_front(*it); 97 } 98 active_.erase(it); 99 return; 100 } 101 } 102 ASSERT(false); 103 } 104 105 void StreamCache::OnStreamEvent(StreamInterface* stream, int events, int err) { 106 if ((events & SE_CLOSE) == 0) { 107 LOG_F(LS_WARNING) << "(" << events << ", " << err 108 << ") received non-close event"; 109 return; 110 } 111 for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); 112 ++it) { 113 if (stream == it->second) { 114 LOG_F(LS_VERBOSE) << "(" << it->first << ")"; 115 // We don't cache closed streams, so return it. 116 it->second->SignalEvent.disconnect(this); 117 LOG_F(LS_VERBOSE) << "Returning closed stream"; 118 pool_->ReturnConnectedStream(it->second); 119 cached_.erase(it); 120 return; 121 } 122 } 123 ASSERT(false); 124 } 125 126 ////////////////////////////////////////////////////////////////////// 127 // NewSocketPool 128 ////////////////////////////////////////////////////////////////////// 129 130 NewSocketPool::NewSocketPool(SocketFactory* factory) : factory_(factory) { 131 } 132 133 NewSocketPool::~NewSocketPool() { 134 } 135 136 StreamInterface* 137 NewSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) { 138 AsyncSocket* socket = 139 factory_->CreateAsyncSocket(remote.family(), SOCK_STREAM); 140 if (!socket) { 141 if (err) 142 *err = -1; 143 return NULL; 144 } 145 if ((socket->Connect(remote) != 0) && !socket->IsBlocking()) { 146 if (err) 147 *err = socket->GetError(); 148 delete socket; 149 return NULL; 150 } 151 if (err) 152 *err = 0; 153 return new SocketStream(socket); 154 } 155 156 void 157 NewSocketPool::ReturnConnectedStream(StreamInterface* stream) { 158 Thread::Current()->Dispose(stream); 159 } 160 161 ////////////////////////////////////////////////////////////////////// 162 // ReuseSocketPool 163 ////////////////////////////////////////////////////////////////////// 164 165 ReuseSocketPool::ReuseSocketPool(SocketFactory* factory) 166 : factory_(factory), stream_(NULL), checked_out_(false) { 167 } 168 169 ReuseSocketPool::~ReuseSocketPool() { 170 ASSERT(!checked_out_); 171 delete stream_; 172 } 173 174 StreamInterface* 175 ReuseSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) { 176 // Only one socket can be used from this "pool" at a time 177 ASSERT(!checked_out_); 178 if (!stream_) { 179 LOG_F(LS_VERBOSE) << "Creating new socket"; 180 int family = remote.family(); 181 // TODO: Deal with this when we/I clean up DNS resolution. 182 if (remote.IsUnresolvedIP()) { 183 family = AF_INET; 184 } 185 AsyncSocket* socket = 186 factory_->CreateAsyncSocket(family, SOCK_STREAM); 187 if (!socket) { 188 if (err) 189 *err = -1; 190 return NULL; 191 } 192 stream_ = new SocketStream(socket); 193 } 194 if ((stream_->GetState() == SS_OPEN) && (remote == remote_)) { 195 LOG_F(LS_VERBOSE) << "Reusing connection to: " << remote_; 196 } else { 197 remote_ = remote; 198 stream_->Close(); 199 if ((stream_->GetSocket()->Connect(remote_) != 0) 200 && !stream_->GetSocket()->IsBlocking()) { 201 if (err) 202 *err = stream_->GetSocket()->GetError(); 203 return NULL; 204 } else { 205 LOG_F(LS_VERBOSE) << "Opening connection to: " << remote_; 206 } 207 } 208 stream_->SignalEvent.disconnect(this); 209 checked_out_ = true; 210 if (err) 211 *err = 0; 212 return stream_; 213 } 214 215 void 216 ReuseSocketPool::ReturnConnectedStream(StreamInterface* stream) { 217 ASSERT(stream == stream_); 218 ASSERT(checked_out_); 219 checked_out_ = false; 220 // Until the socket is reused, monitor it to determine if it closes. 221 stream_->SignalEvent.connect(this, &ReuseSocketPool::OnStreamEvent); 222 } 223 224 void 225 ReuseSocketPool::OnStreamEvent(StreamInterface* stream, int events, int err) { 226 ASSERT(stream == stream_); 227 ASSERT(!checked_out_); 228 229 // If the stream was written to and then immediately returned to us then 230 // we may get a writable notification for it, which we should ignore. 231 if (events == SE_WRITE) { 232 LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly writable: ignoring"; 233 return; 234 } 235 236 // If the peer sent data, we can't process it, so drop the connection. 237 // If the socket has closed, clean it up. 238 // In either case, we'll reconnect it the next time it is used. 239 ASSERT(0 != (events & (SE_READ|SE_CLOSE))); 240 if (0 != (events & SE_CLOSE)) { 241 LOG_F(LS_VERBOSE) << "Connection closed with error: " << err; 242 } else { 243 LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly readable: closing"; 244 } 245 stream_->Close(); 246 } 247 248 /////////////////////////////////////////////////////////////////////////////// 249 // LoggingPoolAdapter - Adapts a StreamPool to supply streams with attached 250 // LoggingAdapters. 251 /////////////////////////////////////////////////////////////////////////////// 252 253 LoggingPoolAdapter::LoggingPoolAdapter( 254 StreamPool* pool, LoggingSeverity level, const std::string& label, 255 bool binary_mode) 256 : pool_(pool), level_(level), label_(label), binary_mode_(binary_mode) { 257 } 258 259 LoggingPoolAdapter::~LoggingPoolAdapter() { 260 for (StreamList::iterator it = recycle_bin_.begin(); 261 it != recycle_bin_.end(); ++it) { 262 delete *it; 263 } 264 } 265 266 StreamInterface* LoggingPoolAdapter::RequestConnectedStream( 267 const SocketAddress& remote, int* err) { 268 if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) { 269 ASSERT(SS_CLOSED != stream->GetState()); 270 std::stringstream ss; 271 ss << label_ << "(0x" << std::setfill('0') << std::hex << std::setw(8) 272 << stream << ")"; 273 LOG_V(level_) << ss.str() 274 << ((SS_OPEN == stream->GetState()) ? " Connected" 275 : " Connecting") 276 << " to " << remote; 277 if (recycle_bin_.empty()) { 278 return new LoggingAdapter(stream, level_, ss.str(), binary_mode_); 279 } 280 LoggingAdapter* logging = recycle_bin_.front(); 281 recycle_bin_.pop_front(); 282 logging->set_label(ss.str()); 283 logging->Attach(stream); 284 return logging; 285 } 286 return NULL; 287 } 288 289 void LoggingPoolAdapter::ReturnConnectedStream(StreamInterface* stream) { 290 LoggingAdapter* logging = static_cast<LoggingAdapter*>(stream); 291 pool_->ReturnConnectedStream(logging->Detach()); 292 recycle_bin_.push_back(logging); 293 } 294 295 /////////////////////////////////////////////////////////////////////////////// 296 297 } // namespace talk_base 298