1 /* 2 * Copyright 2014 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 package org.appspot.apprtc; 12 13 import org.appspot.apprtc.util.AsyncHttpURLConnection; 14 import org.appspot.apprtc.util.AsyncHttpURLConnection.AsyncHttpEvents; 15 import org.appspot.apprtc.util.LooperExecutor; 16 17 import android.util.Log; 18 19 import de.tavendo.autobahn.WebSocket.WebSocketConnectionObserver; 20 import de.tavendo.autobahn.WebSocketConnection; 21 import de.tavendo.autobahn.WebSocketException; 22 23 import org.json.JSONException; 24 import org.json.JSONObject; 25 26 import java.net.URI; 27 import java.net.URISyntaxException; 28 import java.util.LinkedList; 29 30 /** 31 * WebSocket client implementation. 32 * 33 * <p>All public methods should be called from a looper executor thread 34 * passed in a constructor, otherwise exception will be thrown. 35 * All events are dispatched on the same thread. 36 */ 37 38 public class WebSocketChannelClient { 39 private static final String TAG = "WSChannelRTCClient"; 40 private static final int CLOSE_TIMEOUT = 1000; 41 private final WebSocketChannelEvents events; 42 private final LooperExecutor executor; 43 private WebSocketConnection ws; 44 private WebSocketObserver wsObserver; 45 private String wsServerUrl; 46 private String postServerUrl; 47 private String roomID; 48 private String clientID; 49 private WebSocketConnectionState state; 50 private final Object closeEventLock = new Object(); 51 private boolean closeEvent; 52 // WebSocket send queue. Messages are added to the queue when WebSocket 53 // client is not registered and are consumed in register() call. 54 private final LinkedList<String> wsSendQueue; 55 56 /** 57 * Possible WebSocket connection states. 58 */ 59 public enum WebSocketConnectionState { 60 NEW, CONNECTED, REGISTERED, CLOSED, ERROR 61 }; 62 63 /** 64 * Callback interface for messages delivered on WebSocket. 65 * All events are dispatched from a looper executor thread. 66 */ 67 public interface WebSocketChannelEvents { 68 public void onWebSocketMessage(final String message); 69 public void onWebSocketClose(); 70 public void onWebSocketError(final String description); 71 } 72 73 public WebSocketChannelClient(LooperExecutor executor, WebSocketChannelEvents events) { 74 this.executor = executor; 75 this.events = events; 76 roomID = null; 77 clientID = null; 78 wsSendQueue = new LinkedList<String>(); 79 state = WebSocketConnectionState.NEW; 80 } 81 82 public WebSocketConnectionState getState() { 83 return state; 84 } 85 86 public void connect(final String wsUrl, final String postUrl) { 87 checkIfCalledOnValidThread(); 88 if (state != WebSocketConnectionState.NEW) { 89 Log.e(TAG, "WebSocket is already connected."); 90 return; 91 } 92 wsServerUrl = wsUrl; 93 postServerUrl = postUrl; 94 closeEvent = false; 95 96 Log.d(TAG, "Connecting WebSocket to: " + wsUrl + ". Post URL: " + postUrl); 97 ws = new WebSocketConnection(); 98 wsObserver = new WebSocketObserver(); 99 try { 100 ws.connect(new URI(wsServerUrl), wsObserver); 101 } catch (URISyntaxException e) { 102 reportError("URI error: " + e.getMessage()); 103 } catch (WebSocketException e) { 104 reportError("WebSocket connection error: " + e.getMessage()); 105 } 106 } 107 108 public void register(final String roomID, final String clientID) { 109 checkIfCalledOnValidThread(); 110 this.roomID = roomID; 111 this.clientID = clientID; 112 if (state != WebSocketConnectionState.CONNECTED) { 113 Log.w(TAG, "WebSocket register() in state " + state); 114 return; 115 } 116 Log.d(TAG, "Registering WebSocket for room " + roomID + ". CLientID: " + clientID); 117 JSONObject json = new JSONObject(); 118 try { 119 json.put("cmd", "register"); 120 json.put("roomid", roomID); 121 json.put("clientid", clientID); 122 Log.d(TAG, "C->WSS: " + json.toString()); 123 ws.sendTextMessage(json.toString()); 124 state = WebSocketConnectionState.REGISTERED; 125 // Send any previously accumulated messages. 126 for (String sendMessage : wsSendQueue) { 127 send(sendMessage); 128 } 129 wsSendQueue.clear(); 130 } catch (JSONException e) { 131 reportError("WebSocket register JSON error: " + e.getMessage()); 132 } 133 } 134 135 public void send(String message) { 136 checkIfCalledOnValidThread(); 137 switch (state) { 138 case NEW: 139 case CONNECTED: 140 // Store outgoing messages and send them after websocket client 141 // is registered. 142 Log.d(TAG, "WS ACC: " + message); 143 wsSendQueue.add(message); 144 return; 145 case ERROR: 146 case CLOSED: 147 Log.e(TAG, "WebSocket send() in error or closed state : " + message); 148 return; 149 case REGISTERED: 150 JSONObject json = new JSONObject(); 151 try { 152 json.put("cmd", "send"); 153 json.put("msg", message); 154 message = json.toString(); 155 Log.d(TAG, "C->WSS: " + message); 156 ws.sendTextMessage(message); 157 } catch (JSONException e) { 158 reportError("WebSocket send JSON error: " + e.getMessage()); 159 } 160 break; 161 } 162 return; 163 } 164 165 // This call can be used to send WebSocket messages before WebSocket 166 // connection is opened. 167 public void post(String message) { 168 checkIfCalledOnValidThread(); 169 sendWSSMessage("POST", message); 170 } 171 172 public void disconnect(boolean waitForComplete) { 173 checkIfCalledOnValidThread(); 174 Log.d(TAG, "Disonnect WebSocket. State: " + state); 175 if (state == WebSocketConnectionState.REGISTERED) { 176 // Send "bye" to WebSocket server. 177 send("{\"type\": \"bye\"}"); 178 state = WebSocketConnectionState.CONNECTED; 179 // Send http DELETE to http WebSocket server. 180 sendWSSMessage("DELETE", ""); 181 } 182 // Close WebSocket in CONNECTED or ERROR states only. 183 if (state == WebSocketConnectionState.CONNECTED 184 || state == WebSocketConnectionState.ERROR) { 185 ws.disconnect(); 186 state = WebSocketConnectionState.CLOSED; 187 188 // Wait for websocket close event to prevent websocket library from 189 // sending any pending messages to deleted looper thread. 190 if (waitForComplete) { 191 synchronized (closeEventLock) { 192 while (!closeEvent) { 193 try { 194 closeEventLock.wait(CLOSE_TIMEOUT); 195 break; 196 } catch (InterruptedException e) { 197 Log.e(TAG, "Wait error: " + e.toString()); 198 } 199 } 200 } 201 } 202 } 203 Log.d(TAG, "Disonnecting WebSocket done."); 204 } 205 206 private void reportError(final String errorMessage) { 207 Log.e(TAG, errorMessage); 208 executor.execute(new Runnable() { 209 @Override 210 public void run() { 211 if (state != WebSocketConnectionState.ERROR) { 212 state = WebSocketConnectionState.ERROR; 213 events.onWebSocketError(errorMessage); 214 } 215 } 216 }); 217 } 218 219 // Asynchronously send POST/DELETE to WebSocket server. 220 private void sendWSSMessage(final String method, final String message) { 221 String postUrl = postServerUrl + "/" + roomID + "/" + clientID; 222 Log.d(TAG, "WS " + method + " : " + postUrl + " : " + message); 223 AsyncHttpURLConnection httpConnection = new AsyncHttpURLConnection( 224 method, postUrl, message, new AsyncHttpEvents() { 225 @Override 226 public void onHttpError(String errorMessage) { 227 reportError("WS " + method + " error: " + errorMessage); 228 } 229 230 @Override 231 public void onHttpComplete(String response) { 232 } 233 }); 234 httpConnection.send(); 235 } 236 237 // Helper method for debugging purposes. Ensures that WebSocket method is 238 // called on a looper thread. 239 private void checkIfCalledOnValidThread() { 240 if (!executor.checkOnLooperThread()) { 241 throw new IllegalStateException( 242 "WebSocket method is not called on valid thread"); 243 } 244 } 245 246 private class WebSocketObserver implements WebSocketConnectionObserver { 247 @Override 248 public void onOpen() { 249 Log.d(TAG, "WebSocket connection opened to: " + wsServerUrl); 250 executor.execute(new Runnable() { 251 @Override 252 public void run() { 253 state = WebSocketConnectionState.CONNECTED; 254 // Check if we have pending register request. 255 if (roomID != null && clientID != null) { 256 register(roomID, clientID); 257 } 258 } 259 }); 260 } 261 262 @Override 263 public void onClose(WebSocketCloseNotification code, String reason) { 264 Log.d(TAG, "WebSocket connection closed. Code: " + code 265 + ". Reason: " + reason + ". State: " + state); 266 synchronized (closeEventLock) { 267 closeEvent = true; 268 closeEventLock.notify(); 269 } 270 executor.execute(new Runnable() { 271 @Override 272 public void run() { 273 if (state != WebSocketConnectionState.CLOSED) { 274 state = WebSocketConnectionState.CLOSED; 275 events.onWebSocketClose(); 276 } 277 } 278 }); 279 } 280 281 @Override 282 public void onTextMessage(String payload) { 283 Log.d(TAG, "WSS->C: " + payload); 284 final String message = payload; 285 executor.execute(new Runnable() { 286 @Override 287 public void run() { 288 if (state == WebSocketConnectionState.CONNECTED 289 || state == WebSocketConnectionState.REGISTERED) { 290 events.onWebSocketMessage(message); 291 } 292 } 293 }); 294 } 295 296 @Override 297 public void onRawTextMessage(byte[] payload) { 298 } 299 300 @Override 301 public void onBinaryMessage(byte[] payload) { 302 } 303 } 304 305 } 306