Home | History | Annotate | Download | only in apprtc
      1 /*
      2  *  Copyright 2014 The WebRTC Project Authors. All rights reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 package org.appspot.apprtc;
     12 
     13 import org.appspot.apprtc.util.AsyncHttpURLConnection;
     14 import org.appspot.apprtc.util.AsyncHttpURLConnection.AsyncHttpEvents;
     15 import org.appspot.apprtc.util.LooperExecutor;
     16 
     17 import android.util.Log;
     18 
     19 import de.tavendo.autobahn.WebSocket.WebSocketConnectionObserver;
     20 import de.tavendo.autobahn.WebSocketConnection;
     21 import de.tavendo.autobahn.WebSocketException;
     22 
     23 import org.json.JSONException;
     24 import org.json.JSONObject;
     25 
     26 import java.net.URI;
     27 import java.net.URISyntaxException;
     28 import java.util.LinkedList;
     29 
     30 /**
     31  * WebSocket client implementation.
     32  *
     33  * <p>All public methods should be called from a looper executor thread
     34  * passed in a constructor, otherwise exception will be thrown.
     35  * All events are dispatched on the same thread.
     36  */
     37 
     38 public class WebSocketChannelClient {
     39   private static final String TAG = "WSChannelRTCClient";
     40   private static final int CLOSE_TIMEOUT = 1000;
     41   private final WebSocketChannelEvents events;
     42   private final LooperExecutor executor;
     43   private WebSocketConnection ws;
     44   private WebSocketObserver wsObserver;
     45   private String wsServerUrl;
     46   private String postServerUrl;
     47   private String roomID;
     48   private String clientID;
     49   private WebSocketConnectionState state;
     50   private final Object closeEventLock = new Object();
     51   private boolean closeEvent;
     52   // WebSocket send queue. Messages are added to the queue when WebSocket
     53   // client is not registered and are consumed in register() call.
     54   private final LinkedList<String> wsSendQueue;
     55 
     56   /**
     57    * Possible WebSocket connection states.
     58    */
     59   public enum WebSocketConnectionState {
     60     NEW, CONNECTED, REGISTERED, CLOSED, ERROR
     61   };
     62 
     63   /**
     64    * Callback interface for messages delivered on WebSocket.
     65    * All events are dispatched from a looper executor thread.
     66    */
     67   public interface WebSocketChannelEvents {
     68     public void onWebSocketMessage(final String message);
     69     public void onWebSocketClose();
     70     public void onWebSocketError(final String description);
     71   }
     72 
     73   public WebSocketChannelClient(LooperExecutor executor, WebSocketChannelEvents events) {
     74     this.executor = executor;
     75     this.events = events;
     76     roomID = null;
     77     clientID = null;
     78     wsSendQueue = new LinkedList<String>();
     79     state = WebSocketConnectionState.NEW;
     80   }
     81 
     82   public WebSocketConnectionState getState() {
     83     return state;
     84   }
     85 
     86   public void connect(final String wsUrl, final String postUrl) {
     87     checkIfCalledOnValidThread();
     88     if (state != WebSocketConnectionState.NEW) {
     89       Log.e(TAG, "WebSocket is already connected.");
     90       return;
     91     }
     92     wsServerUrl = wsUrl;
     93     postServerUrl = postUrl;
     94     closeEvent = false;
     95 
     96     Log.d(TAG, "Connecting WebSocket to: " + wsUrl + ". Post URL: " + postUrl);
     97     ws = new WebSocketConnection();
     98     wsObserver = new WebSocketObserver();
     99     try {
    100       ws.connect(new URI(wsServerUrl), wsObserver);
    101     } catch (URISyntaxException e) {
    102       reportError("URI error: " + e.getMessage());
    103     } catch (WebSocketException e) {
    104       reportError("WebSocket connection error: " + e.getMessage());
    105     }
    106   }
    107 
    108   public void register(final String roomID, final String clientID) {
    109     checkIfCalledOnValidThread();
    110     this.roomID = roomID;
    111     this.clientID = clientID;
    112     if (state != WebSocketConnectionState.CONNECTED) {
    113       Log.w(TAG, "WebSocket register() in state " + state);
    114       return;
    115     }
    116     Log.d(TAG, "Registering WebSocket for room " + roomID + ". CLientID: " + clientID);
    117     JSONObject json = new JSONObject();
    118     try {
    119       json.put("cmd", "register");
    120       json.put("roomid", roomID);
    121       json.put("clientid", clientID);
    122       Log.d(TAG, "C->WSS: " + json.toString());
    123       ws.sendTextMessage(json.toString());
    124       state = WebSocketConnectionState.REGISTERED;
    125       // Send any previously accumulated messages.
    126       for (String sendMessage : wsSendQueue) {
    127         send(sendMessage);
    128       }
    129       wsSendQueue.clear();
    130     } catch (JSONException e) {
    131       reportError("WebSocket register JSON error: " + e.getMessage());
    132     }
    133   }
    134 
    135   public void send(String message) {
    136     checkIfCalledOnValidThread();
    137     switch (state) {
    138       case NEW:
    139       case CONNECTED:
    140         // Store outgoing messages and send them after websocket client
    141         // is registered.
    142         Log.d(TAG, "WS ACC: " + message);
    143         wsSendQueue.add(message);
    144         return;
    145       case ERROR:
    146       case CLOSED:
    147         Log.e(TAG, "WebSocket send() in error or closed state : " + message);
    148         return;
    149       case REGISTERED:
    150         JSONObject json = new JSONObject();
    151         try {
    152           json.put("cmd", "send");
    153           json.put("msg", message);
    154           message = json.toString();
    155           Log.d(TAG, "C->WSS: " + message);
    156           ws.sendTextMessage(message);
    157         } catch (JSONException e) {
    158           reportError("WebSocket send JSON error: " + e.getMessage());
    159         }
    160         break;
    161     }
    162     return;
    163   }
    164 
    165   // This call can be used to send WebSocket messages before WebSocket
    166   // connection is opened.
    167   public void post(String message) {
    168     checkIfCalledOnValidThread();
    169     sendWSSMessage("POST", message);
    170   }
    171 
    172   public void disconnect(boolean waitForComplete) {
    173     checkIfCalledOnValidThread();
    174     Log.d(TAG, "Disonnect WebSocket. State: " + state);
    175     if (state == WebSocketConnectionState.REGISTERED) {
    176       // Send "bye" to WebSocket server.
    177       send("{\"type\": \"bye\"}");
    178       state = WebSocketConnectionState.CONNECTED;
    179       // Send http DELETE to http WebSocket server.
    180       sendWSSMessage("DELETE", "");
    181     }
    182     // Close WebSocket in CONNECTED or ERROR states only.
    183     if (state == WebSocketConnectionState.CONNECTED
    184         || state == WebSocketConnectionState.ERROR) {
    185       ws.disconnect();
    186       state = WebSocketConnectionState.CLOSED;
    187 
    188       // Wait for websocket close event to prevent websocket library from
    189       // sending any pending messages to deleted looper thread.
    190       if (waitForComplete) {
    191         synchronized (closeEventLock) {
    192           while (!closeEvent) {
    193             try {
    194               closeEventLock.wait(CLOSE_TIMEOUT);
    195               break;
    196             } catch (InterruptedException e) {
    197               Log.e(TAG, "Wait error: " + e.toString());
    198             }
    199           }
    200         }
    201       }
    202     }
    203     Log.d(TAG, "Disonnecting WebSocket done.");
    204   }
    205 
    206   private void reportError(final String errorMessage) {
    207     Log.e(TAG, errorMessage);
    208     executor.execute(new Runnable() {
    209       @Override
    210       public void run() {
    211         if (state != WebSocketConnectionState.ERROR) {
    212           state = WebSocketConnectionState.ERROR;
    213           events.onWebSocketError(errorMessage);
    214         }
    215       }
    216     });
    217   }
    218 
    219   // Asynchronously send POST/DELETE to WebSocket server.
    220   private void sendWSSMessage(final String method, final String message) {
    221     String postUrl = postServerUrl + "/" + roomID + "/" + clientID;
    222     Log.d(TAG, "WS " + method + " : " + postUrl + " : " + message);
    223     AsyncHttpURLConnection httpConnection = new AsyncHttpURLConnection(
    224         method, postUrl, message, new AsyncHttpEvents() {
    225           @Override
    226           public void onHttpError(String errorMessage) {
    227             reportError("WS " + method + " error: " + errorMessage);
    228           }
    229 
    230           @Override
    231           public void onHttpComplete(String response) {
    232           }
    233         });
    234     httpConnection.send();
    235   }
    236 
    237    // Helper method for debugging purposes. Ensures that WebSocket method is
    238    // called on a looper thread.
    239   private void checkIfCalledOnValidThread() {
    240     if (!executor.checkOnLooperThread()) {
    241       throw new IllegalStateException(
    242           "WebSocket method is not called on valid thread");
    243     }
    244   }
    245 
    246   private class WebSocketObserver implements WebSocketConnectionObserver {
    247     @Override
    248     public void onOpen() {
    249       Log.d(TAG, "WebSocket connection opened to: " + wsServerUrl);
    250       executor.execute(new Runnable() {
    251         @Override
    252         public void run() {
    253           state = WebSocketConnectionState.CONNECTED;
    254           // Check if we have pending register request.
    255           if (roomID != null && clientID != null) {
    256             register(roomID, clientID);
    257           }
    258         }
    259       });
    260     }
    261 
    262     @Override
    263     public void onClose(WebSocketCloseNotification code, String reason) {
    264       Log.d(TAG, "WebSocket connection closed. Code: " + code
    265           + ". Reason: " + reason + ". State: " + state);
    266       synchronized (closeEventLock) {
    267         closeEvent = true;
    268         closeEventLock.notify();
    269       }
    270       executor.execute(new Runnable() {
    271         @Override
    272         public void run() {
    273           if (state != WebSocketConnectionState.CLOSED) {
    274             state = WebSocketConnectionState.CLOSED;
    275             events.onWebSocketClose();
    276           }
    277         }
    278       });
    279     }
    280 
    281     @Override
    282     public void onTextMessage(String payload) {
    283       Log.d(TAG, "WSS->C: " + payload);
    284       final String message = payload;
    285       executor.execute(new Runnable() {
    286         @Override
    287         public void run() {
    288           if (state == WebSocketConnectionState.CONNECTED
    289               || state == WebSocketConnectionState.REGISTERED) {
    290             events.onWebSocketMessage(message);
    291           }
    292         }
    293       });
    294     }
    295 
    296     @Override
    297     public void onRawTextMessage(byte[] payload) {
    298     }
    299 
    300     @Override
    301     public void onBinaryMessage(byte[] payload) {
    302     }
    303   }
    304 
    305 }
    306