1 /* 2 * libjingle 3 * Copyright 2004--2005, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include <iomanip> 29 30 #include "talk/base/asyncsocket.h" 31 #include "talk/base/logging.h" 32 #include "talk/base/socketfactory.h" 33 #include "talk/base/socketpool.h" 34 #include "talk/base/socketstream.h" 35 #include "talk/base/thread.h" 36 37 namespace talk_base { 38 39 /////////////////////////////////////////////////////////////////////////////// 40 // StreamCache - Caches a set of open streams, defers creation to a separate 41 // StreamPool. 42 /////////////////////////////////////////////////////////////////////////////// 43 44 StreamCache::StreamCache(StreamPool* pool) : pool_(pool) { 45 } 46 47 StreamCache::~StreamCache() { 48 for (ConnectedList::iterator it = active_.begin(); it != active_.end(); 49 ++it) { 50 delete it->second; 51 } 52 for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); 53 ++it) { 54 delete it->second; 55 } 56 } 57 58 StreamInterface* StreamCache::RequestConnectedStream( 59 const SocketAddress& remote, int* err) { 60 LOG_F(LS_VERBOSE) << "(" << remote << ")"; 61 for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); 62 ++it) { 63 if (remote == it->first) { 64 it->second->SignalEvent.disconnect(this); 65 // Move from cached_ to active_ 66 active_.push_front(*it); 67 cached_.erase(it); 68 if (err) 69 *err = 0; 70 LOG_F(LS_VERBOSE) << "Providing cached stream"; 71 return active_.front().second; 72 } 73 } 74 if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) { 75 // We track active streams so that we can remember their address 76 active_.push_front(ConnectedStream(remote, stream)); 77 LOG_F(LS_VERBOSE) << "Providing new stream"; 78 return active_.front().second; 79 } 80 return NULL; 81 } 82 83 void StreamCache::ReturnConnectedStream(StreamInterface* stream) { 84 for (ConnectedList::iterator it = active_.begin(); it != active_.end(); 85 ++it) { 86 if (stream == it->second) { 87 LOG_F(LS_VERBOSE) << "(" << it->first << ")"; 88 if (stream->GetState() == SS_CLOSED) { 89 // Return closed streams 90 LOG_F(LS_VERBOSE) << "Returning closed stream"; 91 pool_->ReturnConnectedStream(it->second); 92 } else { 93 // Monitor open streams 94 stream->SignalEvent.connect(this, &StreamCache::OnStreamEvent); 95 LOG_F(LS_VERBOSE) << "Caching stream"; 96 cached_.push_front(*it); 97 } 98 active_.erase(it); 99 return; 100 } 101 } 102 ASSERT(false); 103 } 104 105 void StreamCache::OnStreamEvent(StreamInterface* stream, int events, int err) { 106 if ((events & SE_CLOSE) == 0) { 107 LOG_F(LS_WARNING) << "(" << events << ", " << err 108 << ") received non-close event"; 109 return; 110 } 111 for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); 112 ++it) { 113 if (stream == it->second) { 114 LOG_F(LS_VERBOSE) << "(" << it->first << ")"; 115 // We don't cache closed streams, so return it. 116 it->second->SignalEvent.disconnect(this); 117 LOG_F(LS_VERBOSE) << "Returning closed stream"; 118 pool_->ReturnConnectedStream(it->second); 119 cached_.erase(it); 120 return; 121 } 122 } 123 ASSERT(false); 124 } 125 126 ////////////////////////////////////////////////////////////////////// 127 // NewSocketPool 128 ////////////////////////////////////////////////////////////////////// 129 130 NewSocketPool::NewSocketPool(SocketFactory* factory) : factory_(factory) { 131 } 132 133 NewSocketPool::~NewSocketPool() { 134 } 135 136 StreamInterface* 137 NewSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) { 138 AsyncSocket* socket = factory_->CreateAsyncSocket(SOCK_STREAM); 139 if (!socket) { 140 ASSERT(false); 141 if (err) 142 *err = -1; 143 return NULL; 144 } 145 if ((socket->Connect(remote) != 0) && !socket->IsBlocking()) { 146 if (err) 147 *err = socket->GetError(); 148 delete socket; 149 return NULL; 150 } 151 if (err) 152 *err = 0; 153 return new SocketStream(socket); 154 } 155 156 void 157 NewSocketPool::ReturnConnectedStream(StreamInterface* stream) { 158 Thread::Current()->Dispose(stream); 159 } 160 161 ////////////////////////////////////////////////////////////////////// 162 // ReuseSocketPool 163 ////////////////////////////////////////////////////////////////////// 164 165 ReuseSocketPool::ReuseSocketPool(SocketFactory* factory) 166 : factory_(factory), stream_(NULL), checked_out_(false) { 167 } 168 169 ReuseSocketPool::~ReuseSocketPool() { 170 ASSERT(!checked_out_); 171 delete stream_; 172 } 173 174 StreamInterface* 175 ReuseSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) { 176 // Only one socket can be used from this "pool" at a time 177 ASSERT(!checked_out_); 178 if (!stream_) { 179 LOG_F(LS_VERBOSE) << "Creating new socket"; 180 AsyncSocket* socket = factory_->CreateAsyncSocket(SOCK_STREAM); 181 if (!socket) { 182 ASSERT(false); 183 if (err) 184 *err = -1; 185 return NULL; 186 } 187 stream_ = new SocketStream(socket); 188 } 189 if ((stream_->GetState() == SS_OPEN) && (remote == remote_)) { 190 LOG_F(LS_VERBOSE) << "Reusing connection to: " << remote_; 191 } else { 192 remote_ = remote; 193 stream_->Close(); 194 if ((stream_->GetSocket()->Connect(remote_) != 0) 195 && !stream_->GetSocket()->IsBlocking()) { 196 if (err) 197 *err = stream_->GetSocket()->GetError(); 198 return NULL; 199 } else { 200 LOG_F(LS_VERBOSE) << "Opening connection to: " << remote_; 201 } 202 } 203 stream_->SignalEvent.disconnect(this); 204 checked_out_ = true; 205 if (err) 206 *err = 0; 207 return stream_; 208 } 209 210 void 211 ReuseSocketPool::ReturnConnectedStream(StreamInterface* stream) { 212 ASSERT(stream == stream_); 213 ASSERT(checked_out_); 214 checked_out_ = false; 215 // Until the socket is reused, monitor it to determine if it closes. 216 stream_->SignalEvent.connect(this, &ReuseSocketPool::OnStreamEvent); 217 } 218 219 void 220 ReuseSocketPool::OnStreamEvent(StreamInterface* stream, int events, int err) { 221 ASSERT(stream == stream_); 222 ASSERT(!checked_out_); 223 224 // If the stream was written to and then immediately returned to us then 225 // we may get a writable notification for it, which we should ignore. 226 if (events == SE_WRITE) { 227 LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly writable: ignoring"; 228 return; 229 } 230 231 // If the peer sent data, we can't process it, so drop the connection. 232 // If the socket has closed, clean it up. 233 // In either case, we'll reconnect it the next time it is used. 234 ASSERT(0 != (events & (SE_READ|SE_CLOSE))); 235 if (0 != (events & SE_CLOSE)) { 236 LOG_F(LS_VERBOSE) << "Connection closed with error: " << err; 237 } else { 238 LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly readable: closing"; 239 } 240 stream_->Close(); 241 } 242 243 /////////////////////////////////////////////////////////////////////////////// 244 // LoggingPoolAdapter - Adapts a StreamPool to supply streams with attached 245 // LoggingAdapters. 246 /////////////////////////////////////////////////////////////////////////////// 247 248 LoggingPoolAdapter::LoggingPoolAdapter( 249 StreamPool* pool, LoggingSeverity level, const std::string& label, 250 bool binary_mode) 251 : pool_(pool), level_(level), label_(label), binary_mode_(binary_mode) { 252 } 253 254 LoggingPoolAdapter::~LoggingPoolAdapter() { 255 for (StreamList::iterator it = recycle_bin_.begin(); 256 it != recycle_bin_.end(); ++it) { 257 delete *it; 258 } 259 } 260 261 StreamInterface* LoggingPoolAdapter::RequestConnectedStream( 262 const SocketAddress& remote, int* err) { 263 if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) { 264 ASSERT(SS_CLOSED != stream->GetState()); 265 std::stringstream ss; 266 ss << label_ << "(0x" << std::setfill('0') << std::hex << std::setw(8) 267 << stream << ")"; 268 LOG_V(level_) << ss.str() 269 << ((SS_OPEN == stream->GetState()) ? " Connected" 270 : " Connecting") 271 << " to " << remote; 272 if (recycle_bin_.empty()) { 273 return new LoggingAdapter(stream, level_, ss.str(), binary_mode_); 274 } 275 LoggingAdapter* logging = recycle_bin_.front(); 276 recycle_bin_.pop_front(); 277 logging->set_label(ss.str()); 278 logging->Attach(stream); 279 return logging; 280 } 281 return NULL; 282 } 283 284 void LoggingPoolAdapter::ReturnConnectedStream(StreamInterface* stream) { 285 LoggingAdapter* logging = static_cast<LoggingAdapter*>(stream); 286 pool_->ReturnConnectedStream(logging->Detach()); 287 recycle_bin_.push_back(logging); 288 } 289 290 /////////////////////////////////////////////////////////////////////////////// 291 292 } // namespace talk_base 293