1 /* 2 * Copyright (C) 2014 Square, Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package com.squareup.okhttp.internal.ws; 17 18 import com.squareup.okhttp.internal.NamedRunnable; 19 import com.squareup.okhttp.ws.WebSocket; 20 import com.squareup.okhttp.ws.WebSocketListener; 21 import java.io.IOException; 22 import java.net.ProtocolException; 23 import java.util.Random; 24 import java.util.concurrent.Executor; 25 import okio.Buffer; 26 import okio.BufferedSink; 27 import okio.BufferedSource; 28 29 import static com.squareup.okhttp.internal.ws.WebSocketReader.FrameCallback; 30 31 public abstract class RealWebSocket implements WebSocket { 32 /** A close code which indicates that the peer encountered a protocol exception. */ 33 private static final int CLOSE_PROTOCOL_EXCEPTION = 1002; 34 35 private final WebSocketWriter writer; 36 private final WebSocketReader reader; 37 private final WebSocketListener listener; 38 39 /** True after calling {@link #close(int, String)}. No writes are allowed afterward. */ 40 private volatile boolean writerSentClose; 41 /** True after a close frame was read by the reader. No frames will follow it. */ 42 private volatile boolean readerSentClose; 43 /** Lock required to negotiate closing the connection. */ 44 private final Object closeLock = new Object(); 45 46 public RealWebSocket(boolean isClient, BufferedSource source, BufferedSink sink, Random random, 47 final Executor replyExecutor, final WebSocketListener listener, final String url) { 48 this.listener = listener; 49 50 writer = new WebSocketWriter(isClient, sink, random); 51 reader = new WebSocketReader(isClient, source, new FrameCallback() { 52 @Override public void onMessage(BufferedSource source, PayloadType type) throws IOException { 53 listener.onMessage(source, type); 54 } 55 56 @Override public void onPing(final Buffer buffer) { 57 replyExecutor.execute(new NamedRunnable("OkHttp %s WebSocket Pong Reply", url) { 58 @Override protected void execute() { 59 try { 60 writer.writePong(buffer); 61 } catch (IOException ignored) { 62 } 63 } 64 }); 65 } 66 67 @Override public void onPong(Buffer buffer) { 68 listener.onPong(buffer); 69 } 70 71 @Override public void onClose(final int code, final String reason) { 72 final boolean writeCloseResponse; 73 synchronized (closeLock) { 74 readerSentClose = true; 75 76 // If the writer has not indicated a desire to close we will write a close response. 77 writeCloseResponse = !writerSentClose; 78 } 79 80 replyExecutor.execute(new NamedRunnable("OkHttp %s WebSocket Close Reply", url) { 81 @Override protected void execute() { 82 peerClose(code, reason, writeCloseResponse); 83 } 84 }); 85 } 86 }); 87 } 88 89 /** 90 * Read a single message from the web socket and deliver it to the listener. This method should 91 * be called in a loop with the return value indicating whether looping should continue. 92 */ 93 public boolean readMessage() { 94 try { 95 reader.processNextFrame(); 96 return !readerSentClose; 97 } catch (IOException e) { 98 readerErrorClose(e); 99 return false; 100 } 101 } 102 103 @Override public BufferedSink newMessageSink(PayloadType type) { 104 if (writerSentClose) throw new IllegalStateException("closed"); 105 return writer.newMessageSink(type); 106 } 107 108 @Override public void sendMessage(PayloadType type, Buffer payload) throws IOException { 109 if (writerSentClose) throw new IllegalStateException("closed"); 110 writer.sendMessage(type, payload); 111 } 112 113 @Override public void sendPing(Buffer payload) throws IOException { 114 if (writerSentClose) throw new IllegalStateException("closed"); 115 writer.writePing(payload); 116 } 117 118 /** Send an unsolicited pong with the specified payload. */ 119 public void sendPong(Buffer payload) throws IOException { 120 if (writerSentClose) throw new IllegalStateException("closed"); 121 writer.writePong(payload); 122 } 123 124 @Override public void close(int code, String reason) throws IOException { 125 if (writerSentClose) throw new IllegalStateException("closed"); 126 127 boolean closeConnection; 128 synchronized (closeLock) { 129 writerSentClose = true; 130 131 // If the reader has also indicated a desire to close we will close the connection. 132 closeConnection = readerSentClose; 133 } 134 135 writer.writeClose(code, reason); 136 137 if (closeConnection) { 138 closeConnection(); 139 } 140 } 141 142 /** Replies and closes this web socket when a close frame is read from the peer. */ 143 private void peerClose(int code, String reason, boolean writeCloseResponse) { 144 if (writeCloseResponse) { 145 try { 146 writer.writeClose(code, reason); 147 } catch (IOException ignored) { 148 } 149 } 150 151 try { 152 closeConnection(); 153 } catch (IOException ignored) { 154 } 155 156 listener.onClose(code, reason); 157 } 158 159 /** Called on the reader thread when an error occurs. */ 160 private void readerErrorClose(IOException e) { 161 boolean writeCloseResponse; 162 synchronized (closeLock) { 163 readerSentClose = true; 164 165 // If the writer has not closed we will close the connection. 166 writeCloseResponse = !writerSentClose; 167 } 168 169 if (writeCloseResponse) { 170 if (e instanceof ProtocolException) { 171 // For protocol exceptions, try to inform the server of such. 172 try { 173 writer.writeClose(CLOSE_PROTOCOL_EXCEPTION, null); 174 } catch (IOException ignored) { 175 } 176 } 177 } 178 179 try { 180 closeConnection(); 181 } catch (IOException ignored) { 182 } 183 184 listener.onFailure(e); 185 } 186 187 /** Perform any tear-down work on the connection (close the socket, recycle, etc.). */ 188 protected abstract void closeConnection() throws IOException; 189 } 190