1 /* 2 * Copyright (C) 2009 Google Inc. All rights reserved. 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions are 6 * met: 7 * 8 * * Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * * Redistributions in binary form must reproduce the above 11 * copyright notice, this list of conditions and the following disclaimer 12 * in the documentation and/or other materials provided with the 13 * distribution. 14 * * Neither the name of Google Inc. nor the names of its 15 * contributors may be used to endorse or promote products derived from 16 * this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 19 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 20 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 21 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 22 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 23 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 24 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 25 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 26 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 28 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 #include "config.h" 32 #include "SocketStreamHandle.h" 33 34 #include "CString.h" 35 #include "GOwnPtr.h" 36 #include "KURL.h" 37 #include "Logging.h" 38 #include "NotFound.h" 39 #include "NotImplemented.h" 40 #include "SocketStreamError.h" 41 #include "SocketStreamHandleClient.h" 42 #include "Vector.h" 43 #include <gio/gio.h> 44 #include <glib.h> 45 46 #define READ_BUFFER_SIZE 1024 47 48 namespace WebCore { 49 50 // These functions immediately call the similarly named SocketStreamHandle methods. 51 static void connectedCallback(GSocketClient*, GAsyncResult*, void*); 52 static void readReadyCallback(GInputStream*, GAsyncResult*, void*); 53 static gboolean writeReadyCallback(GPollableOutputStream*, void*); 54 55 // Having a list of active handles means that we do not have to worry about WebCore 56 // reference counting in GLib callbacks. Once the handle is off the active handles list 57 // we just ignore it in the callback. We avoid a lot of extra checks and tricky 58 // situations this way. 59 static HashMap<void*, SocketStreamHandle*> gActiveHandles; 60 static SocketStreamHandle* getHandleFromId(void* id) 61 { 62 if (!gActiveHandles.contains(id)) 63 return 0; 64 return gActiveHandles.get(id); 65 } 66 67 static void deactivateHandle(SocketStreamHandle* handle) 68 { 69 gActiveHandles.remove(handle->id()); 70 } 71 72 static void* activateHandle(SocketStreamHandle* handle) 73 { 74 // The first id cannot be 0, because it conflicts with the HashMap emptyValue. 75 static gint currentHandleId = 1; 76 void* id = GINT_TO_POINTER(currentHandleId++); 77 gActiveHandles.set(id, handle); 78 return id; 79 } 80 81 SocketStreamHandle::SocketStreamHandle(const KURL& url, SocketStreamHandleClient* client) 82 : SocketStreamHandleBase(url, client) 83 , m_readBuffer(0) 84 { 85 unsigned int port = url.hasPort() ? url.port() : (url.protocolIs("wss") ? 443 : 80); 86 87 m_id = activateHandle(this); 88 GRefPtr<GSocketClient> socketClient = adoptGRef(g_socket_client_new()); 89 if (url.protocolIs("wss")) 90 g_socket_client_set_tls(socketClient.get(), TRUE); 91 g_socket_client_connect_to_host_async(socketClient.get(), url.host().utf8().data(), port, 0, 92 reinterpret_cast<GAsyncReadyCallback>(connectedCallback), m_id); 93 } 94 95 SocketStreamHandle::~SocketStreamHandle() 96 { 97 // If for some reason we were destroyed without closing, ensure that we are deactivated. 98 deactivateHandle(this); 99 setClient(0); 100 } 101 102 void SocketStreamHandle::connected(GSocketConnection* socketConnection, GError* error) 103 { 104 if (error) { 105 m_client->didFail(this, SocketStreamError(error->code)); 106 return; 107 } 108 109 m_socketConnection = adoptGRef(socketConnection); 110 m_outputStream = G_POLLABLE_OUTPUT_STREAM(g_io_stream_get_output_stream(G_IO_STREAM(m_socketConnection.get()))); 111 m_inputStream = g_io_stream_get_input_stream(G_IO_STREAM(m_socketConnection.get())); 112 113 m_readBuffer = new char[READ_BUFFER_SIZE]; 114 g_input_stream_read_async(m_inputStream.get(), m_readBuffer, READ_BUFFER_SIZE, G_PRIORITY_DEFAULT, 0, 115 reinterpret_cast<GAsyncReadyCallback>(readReadyCallback), m_id); 116 117 // The client can close the handle, potentially removing the last reference. 118 RefPtr<SocketStreamHandle> protect(this); 119 m_state = Open; 120 m_client->didOpen(this); 121 if (!m_socketConnection) // Client closed the connection. 122 return; 123 } 124 125 void SocketStreamHandle::readBytes(signed long bytesRead, GError* error) 126 { 127 if (error) { 128 m_client->didFail(this, SocketStreamError(error->code)); 129 return; 130 } 131 132 if (!bytesRead) { 133 close(); 134 return; 135 } 136 137 // The client can close the handle, potentially removing the last reference. 138 RefPtr<SocketStreamHandle> protect(this); 139 m_client->didReceiveData(this, m_readBuffer, bytesRead); 140 if (m_inputStream) // The client may have closed the connection. 141 g_input_stream_read_async(m_inputStream.get(), m_readBuffer, READ_BUFFER_SIZE, G_PRIORITY_DEFAULT, 0, 142 reinterpret_cast<GAsyncReadyCallback>(readReadyCallback), m_id); 143 } 144 145 void SocketStreamHandle::writeReady() 146 { 147 // We no longer have buffered data, so stop waiting for the socket to be writable. 148 if (!bufferedAmount()) { 149 stopWaitingForSocketWritability(); 150 return; 151 } 152 153 sendPendingData(); 154 } 155 156 int SocketStreamHandle::platformSend(const char* data, int length) 157 { 158 if (!g_pollable_output_stream_is_writable(m_outputStream.get())) { 159 beginWaitingForSocketWritability(); 160 return 0; 161 } 162 163 GOwnPtr<GError> error; 164 gssize written = g_pollable_output_stream_write_nonblocking(m_outputStream.get(), data, length, 0, &error.outPtr()); 165 if (error && !g_error_matches(error.get(), G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { 166 m_client->didFail(this, SocketStreamError(error->code)); // FIXME: Provide a sensible error. 167 return 0; 168 } 169 170 // If we did not send all the bytes we were given, we know that 171 // SocketStreamHandleBase will need to send more in the future. 172 if (written < length) 173 beginWaitingForSocketWritability(); 174 175 return written; 176 } 177 178 void SocketStreamHandle::platformClose() 179 { 180 // We remove this handle from the active handles list first, to disable all callbacks. 181 deactivateHandle(this); 182 stopWaitingForSocketWritability(); 183 184 if (m_socketConnection) { 185 GOwnPtr<GError> error; 186 g_io_stream_close(G_IO_STREAM(m_socketConnection.get()), 0, &error.outPtr()); 187 if (error) 188 m_client->didFail(this, SocketStreamError(error->code)); // FIXME: Provide a sensible error. 189 m_socketConnection = 0; 190 } 191 192 m_outputStream = 0; 193 m_inputStream = 0; 194 delete m_readBuffer; 195 196 m_client->didClose(this); 197 } 198 199 void SocketStreamHandle::didReceiveAuthenticationChallenge(const AuthenticationChallenge&) 200 { 201 notImplemented(); 202 } 203 204 void SocketStreamHandle::receivedCredential(const AuthenticationChallenge&, const Credential&) 205 { 206 notImplemented(); 207 } 208 209 void SocketStreamHandle::receivedRequestToContinueWithoutCredential(const AuthenticationChallenge&) 210 { 211 notImplemented(); 212 } 213 214 void SocketStreamHandle::receivedCancellation(const AuthenticationChallenge&) 215 { 216 notImplemented(); 217 } 218 219 void SocketStreamHandle::beginWaitingForSocketWritability() 220 { 221 if (m_writeReadySource) // Already waiting. 222 return; 223 224 m_writeReadySource = adoptGRef(g_pollable_output_stream_create_source(m_outputStream.get(), 0)); 225 g_source_set_callback(m_writeReadySource.get(), reinterpret_cast<GSourceFunc>(writeReadyCallback), m_id, 0); 226 g_source_attach(m_writeReadySource.get(), 0); 227 } 228 229 void SocketStreamHandle::stopWaitingForSocketWritability() 230 { 231 if (!m_writeReadySource) // Not waiting. 232 return; 233 234 g_source_remove(g_source_get_id(m_writeReadySource.get())); 235 m_writeReadySource = 0; 236 } 237 238 static void connectedCallback(GSocketClient* client, GAsyncResult* result, void* id) 239 { 240 // Always finish the connection, even if this SocketStreamHandle was deactivated earlier. 241 GOwnPtr<GError> error; 242 GSocketConnection* socketConnection = g_socket_client_connect_to_host_finish(client, result, &error.outPtr()); 243 244 // The SocketStreamHandle has been deactivated, so just close the connection, ignoring errors. 245 SocketStreamHandle* handle = getHandleFromId(id); 246 if (!handle) { 247 g_io_stream_close(G_IO_STREAM(socketConnection), 0, &error.outPtr()); 248 return; 249 } 250 251 handle->connected(socketConnection, error.get()); 252 } 253 254 static void readReadyCallback(GInputStream* stream, GAsyncResult* result, void* id) 255 { 256 // Always finish the read, even if this SocketStreamHandle was deactivated earlier. 257 GOwnPtr<GError> error; 258 gssize bytesRead = g_input_stream_read_finish(stream, result, &error.outPtr()); 259 260 SocketStreamHandle* handle = getHandleFromId(id); 261 if (!handle) 262 return; 263 264 handle->readBytes(bytesRead, error.get()); 265 } 266 267 static gboolean writeReadyCallback(GPollableOutputStream*, void* id) 268 { 269 SocketStreamHandle* handle = getHandleFromId(id); 270 if (!handle) 271 return FALSE; 272 273 handle->writeReady(); 274 return TRUE; 275 } 276 277 } // namespace WebCore 278