Home | History | Annotate | Download | only in ws
      1 /*
      2  * Copyright (C) 2014 Square, Inc.
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 package com.squareup.okhttp.internal.ws;
     17 
     18 import com.squareup.okhttp.MediaType;
     19 import com.squareup.okhttp.RequestBody;
     20 import com.squareup.okhttp.ResponseBody;
     21 import com.squareup.okhttp.internal.NamedRunnable;
     22 import com.squareup.okhttp.ws.WebSocket;
     23 import com.squareup.okhttp.ws.WebSocketListener;
     24 import java.io.IOException;
     25 import java.net.ProtocolException;
     26 import java.util.Random;
     27 import java.util.concurrent.Executor;
     28 import java.util.concurrent.atomic.AtomicBoolean;
     29 import okio.Buffer;
     30 import okio.BufferedSink;
     31 import okio.BufferedSource;
     32 import okio.Okio;
     33 
     34 import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_BINARY;
     35 import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_TEXT;
     36 import static com.squareup.okhttp.internal.ws.WebSocketReader.FrameCallback;
     37 
     38 public abstract class RealWebSocket implements WebSocket {
     39   private static final int CLOSE_PROTOCOL_EXCEPTION = 1002;
     40 
     41   private final WebSocketWriter writer;
     42   private final WebSocketReader reader;
     43   private final WebSocketListener listener;
     44 
     45   /** True after calling {@link #close(int, String)}. No writes are allowed afterward. */
     46   private volatile boolean writerSentClose;
     47   /** True after {@link IOException}. {@link #close(int, String)} becomes only valid call. */
     48   private boolean writerWantsClose;
     49   /** True after a close frame was read by the reader. No frames will follow it. */
     50   private boolean readerSentClose;
     51 
     52   /** True after calling {@link #close()} to free connection resources. */
     53   private final AtomicBoolean connectionClosed = new AtomicBoolean();
     54 
     55   public RealWebSocket(boolean isClient, BufferedSource source, BufferedSink sink, Random random,
     56       final Executor replyExecutor, final WebSocketListener listener, final String url) {
     57     this.listener = listener;
     58 
     59     writer = new WebSocketWriter(isClient, sink, random);
     60     reader = new WebSocketReader(isClient, source, new FrameCallback() {
     61       @Override public void onMessage(ResponseBody message) throws IOException {
     62         listener.onMessage(message);
     63       }
     64 
     65       @Override public void onPing(final Buffer buffer) {
     66         replyExecutor.execute(new NamedRunnable("OkHttp %s WebSocket Pong Reply", url) {
     67           @Override protected void execute() {
     68             try {
     69               writer.writePong(buffer);
     70             } catch (IOException ignored) {
     71             }
     72           }
     73         });
     74       }
     75 
     76       @Override public void onPong(Buffer buffer) {
     77         listener.onPong(buffer);
     78       }
     79 
     80       @Override public void onClose(final int code, final String reason) {
     81         readerSentClose = true;
     82         replyExecutor.execute(new NamedRunnable("OkHttp %s WebSocket Close Reply", url) {
     83           @Override protected void execute() {
     84             peerClose(code, reason);
     85           }
     86         });
     87       }
     88     });
     89   }
     90 
     91   /**
     92    * Read a single message from the web socket and deliver it to the listener. This method should
     93    * be called in a loop with the return value indicating whether looping should continue.
     94    */
     95   public boolean readMessage() {
     96     try {
     97       reader.processNextFrame();
     98       return !readerSentClose;
     99     } catch (IOException e) {
    100       readerErrorClose(e);
    101       return false;
    102     }
    103   }
    104 
    105   @Override public void sendMessage(RequestBody message) throws IOException {
    106     if (message == null) throw new NullPointerException("message == null");
    107     if (writerSentClose) throw new IllegalStateException("closed");
    108     if (writerWantsClose) throw new IllegalStateException("must call close()");
    109 
    110     MediaType contentType = message.contentType();
    111     if (contentType == null) {
    112       throw new IllegalArgumentException(
    113           "Message content type was null. Must use WebSocket.TEXT or WebSocket.BINARY.");
    114     }
    115     String contentSubtype = contentType.subtype();
    116 
    117     int formatOpcode;
    118     if (WebSocket.TEXT.subtype().equals(contentSubtype)) {
    119       formatOpcode = OPCODE_TEXT;
    120     } else if (WebSocket.BINARY.subtype().equals(contentSubtype)) {
    121       formatOpcode = OPCODE_BINARY;
    122     } else {
    123       throw new IllegalArgumentException("Unknown message content type: "
    124           + contentType.type() + "/" + contentType.subtype() // Omit any implicitly added charset.
    125           + ". Must use WebSocket.TEXT or WebSocket.BINARY.");
    126     }
    127 
    128     BufferedSink sink = Okio.buffer(writer.newMessageSink(formatOpcode));
    129     try {
    130       message.writeTo(sink);
    131       sink.close();
    132     } catch (IOException e) {
    133       writerWantsClose = true;
    134       throw e;
    135     }
    136   }
    137 
    138   @Override public void sendPing(Buffer payload) throws IOException {
    139     if (writerSentClose) throw new IllegalStateException("closed");
    140     if (writerWantsClose) throw new IllegalStateException("must call close()");
    141 
    142     try {
    143       writer.writePing(payload);
    144     } catch (IOException e) {
    145       writerWantsClose = true;
    146       throw e;
    147     }
    148   }
    149 
    150   /** Send an unsolicited pong with the specified payload. */
    151   public void sendPong(Buffer payload) throws IOException {
    152     if (writerSentClose) throw new IllegalStateException("closed");
    153     if (writerWantsClose) throw new IllegalStateException("must call close()");
    154 
    155     try {
    156       writer.writePong(payload);
    157     } catch (IOException e) {
    158       writerWantsClose = true;
    159       throw e;
    160     }
    161   }
    162 
    163   @Override public void close(int code, String reason) throws IOException {
    164     if (writerSentClose) throw new IllegalStateException("closed");
    165     writerSentClose = true;
    166 
    167     try {
    168       writer.writeClose(code, reason);
    169     } catch (IOException e) {
    170       if (connectionClosed.compareAndSet(false, true)) {
    171         // Try to close the connection without masking the original exception.
    172         try {
    173           close();
    174         } catch (IOException ignored) {
    175         }
    176       }
    177       throw e;
    178     }
    179   }
    180 
    181   /** Replies and closes this web socket when a close frame is read from the peer. */
    182   private void peerClose(int code, String reason) {
    183     if (!writerSentClose) {
    184       try {
    185         writer.writeClose(code, reason);
    186       } catch (IOException ignored) {
    187       }
    188     }
    189 
    190     if (connectionClosed.compareAndSet(false, true)) {
    191       try {
    192         close();
    193       } catch (IOException ignored) {
    194       }
    195     }
    196 
    197     listener.onClose(code, reason);
    198   }
    199 
    200   /** Called on the reader thread when an error occurs. */
    201   private void readerErrorClose(IOException e) {
    202     // For protocol exceptions, try to inform the server of such.
    203     if (!writerSentClose && e instanceof ProtocolException) {
    204       try {
    205         writer.writeClose(CLOSE_PROTOCOL_EXCEPTION, null);
    206       } catch (IOException ignored) {
    207       }
    208     }
    209 
    210     if (connectionClosed.compareAndSet(false, true)) {
    211       try {
    212         close();
    213       } catch (IOException ignored) {
    214       }
    215     }
    216 
    217     listener.onFailure(e, null);
    218   }
    219 
    220   /** Perform any tear-down work (close the connection, shutdown executors). */
    221   protected abstract void close() throws IOException;
    222 }
    223