Home | History | Annotate | Download | only in ws
      1 /*
      2  * Copyright (C) 2014 Square, Inc.
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 package com.squareup.okhttp.internal.ws;
     17 
     18 import com.squareup.okhttp.internal.NamedRunnable;
     19 import com.squareup.okhttp.ws.WebSocket;
     20 import com.squareup.okhttp.ws.WebSocketListener;
     21 import java.io.IOException;
     22 import java.net.ProtocolException;
     23 import java.util.Random;
     24 import java.util.concurrent.Executor;
     25 import okio.Buffer;
     26 import okio.BufferedSink;
     27 import okio.BufferedSource;
     28 
     29 import static com.squareup.okhttp.internal.ws.WebSocketReader.FrameCallback;
     30 
     31 public abstract class RealWebSocket implements WebSocket {
     32   /** A close code which indicates that the peer encountered a protocol exception. */
     33   private static final int CLOSE_PROTOCOL_EXCEPTION = 1002;
     34 
     35   private final WebSocketWriter writer;
     36   private final WebSocketReader reader;
     37   private final WebSocketListener listener;
     38 
     39   /** True after calling {@link #close(int, String)}. No writes are allowed afterward. */
     40   private volatile boolean writerSentClose;
     41   /** True after a close frame was read by the reader. No frames will follow it. */
     42   private volatile boolean readerSentClose;
     43   /** Lock required to negotiate closing the connection. */
     44   private final Object closeLock = new Object();
     45 
     46   public RealWebSocket(boolean isClient, BufferedSource source, BufferedSink sink, Random random,
     47       final Executor replyExecutor, final WebSocketListener listener, final String url) {
     48     this.listener = listener;
     49 
     50     writer = new WebSocketWriter(isClient, sink, random);
     51     reader = new WebSocketReader(isClient, source, new FrameCallback() {
     52       @Override public void onMessage(BufferedSource source, PayloadType type) throws IOException {
     53         listener.onMessage(source, type);
     54       }
     55 
     56       @Override public void onPing(final Buffer buffer) {
     57         replyExecutor.execute(new NamedRunnable("OkHttp %s WebSocket Pong Reply", url) {
     58           @Override protected void execute() {
     59             try {
     60               writer.writePong(buffer);
     61             } catch (IOException ignored) {
     62             }
     63           }
     64         });
     65       }
     66 
     67       @Override public void onPong(Buffer buffer) {
     68         listener.onPong(buffer);
     69       }
     70 
     71       @Override public void onClose(final int code, final String reason) {
     72         final boolean writeCloseResponse;
     73         synchronized (closeLock) {
     74           readerSentClose = true;
     75 
     76           // If the writer has not indicated a desire to close we will write a close response.
     77           writeCloseResponse = !writerSentClose;
     78         }
     79 
     80         replyExecutor.execute(new NamedRunnable("OkHttp %s WebSocket Close Reply", url) {
     81           @Override protected void execute() {
     82             peerClose(code, reason, writeCloseResponse);
     83           }
     84         });
     85       }
     86     });
     87   }
     88 
     89   /**
     90    * Read a single message from the web socket and deliver it to the listener. This method should
     91    * be called in a loop with the return value indicating whether looping should continue.
     92    */
     93   public boolean readMessage() {
     94     try {
     95       reader.processNextFrame();
     96       return !readerSentClose;
     97     } catch (IOException e) {
     98       readerErrorClose(e);
     99       return false;
    100     }
    101   }
    102 
    103   @Override public BufferedSink newMessageSink(PayloadType type) {
    104     if (writerSentClose) throw new IllegalStateException("closed");
    105     return writer.newMessageSink(type);
    106   }
    107 
    108   @Override public void sendMessage(PayloadType type, Buffer payload) throws IOException {
    109     if (writerSentClose) throw new IllegalStateException("closed");
    110     writer.sendMessage(type, payload);
    111   }
    112 
    113   @Override public void sendPing(Buffer payload) throws IOException {
    114     if (writerSentClose) throw new IllegalStateException("closed");
    115     writer.writePing(payload);
    116   }
    117 
    118   /** Send an unsolicited pong with the specified payload. */
    119   public void sendPong(Buffer payload) throws IOException {
    120     if (writerSentClose) throw new IllegalStateException("closed");
    121     writer.writePong(payload);
    122   }
    123 
    124   @Override public void close(int code, String reason) throws IOException {
    125     if (writerSentClose) throw new IllegalStateException("closed");
    126 
    127     boolean closeConnection;
    128     synchronized (closeLock) {
    129       writerSentClose = true;
    130 
    131       // If the reader has also indicated a desire to close we will close the connection.
    132       closeConnection = readerSentClose;
    133     }
    134 
    135     writer.writeClose(code, reason);
    136 
    137     if (closeConnection) {
    138       closeConnection();
    139     }
    140   }
    141 
    142   /** Replies and closes this web socket when a close frame is read from the peer. */
    143   private void peerClose(int code, String reason, boolean writeCloseResponse) {
    144     if (writeCloseResponse) {
    145       try {
    146         writer.writeClose(code, reason);
    147       } catch (IOException ignored) {
    148       }
    149     }
    150 
    151     try {
    152       closeConnection();
    153     } catch (IOException ignored) {
    154     }
    155 
    156     listener.onClose(code, reason);
    157   }
    158 
    159   /** Called on the reader thread when an error occurs. */
    160   private void readerErrorClose(IOException e) {
    161     boolean writeCloseResponse;
    162     synchronized (closeLock) {
    163       readerSentClose = true;
    164 
    165       // If the writer has not closed we will close the connection.
    166       writeCloseResponse = !writerSentClose;
    167     }
    168 
    169     if (writeCloseResponse) {
    170       if (e instanceof ProtocolException) {
    171         // For protocol exceptions, try to inform the server of such.
    172         try {
    173           writer.writeClose(CLOSE_PROTOCOL_EXCEPTION, null);
    174         } catch (IOException ignored) {
    175         }
    176       }
    177     }
    178 
    179     try {
    180       closeConnection();
    181     } catch (IOException ignored) {
    182     }
    183 
    184     listener.onFailure(e);
    185   }
    186 
    187   /** Perform any tear-down work on the connection (close the socket, recycle, etc.). */
    188   protected abstract void closeConnection() throws IOException;
    189 }
    190