1 /* 2 * Copyright (c) 2011 jMonkeyEngine 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are 7 * met: 8 * 9 * * Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 12 * * Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * * Neither the name of 'jMonkeyEngine' nor the names of its contributors 17 * may be used to endorse or promote products derived from this software 18 * without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 22 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR 24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 27 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 28 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 29 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 30 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 31 */ 32 33 package com.jme3.network.base; 34 35 import com.jme3.network.ErrorListener; 36 import com.jme3.network.Message; 37 import com.jme3.network.MessageListener; 38 import com.jme3.network.kernel.Connector; 39 import com.jme3.network.kernel.ConnectorException; 40 import java.nio.ByteBuffer; 41 import java.util.concurrent.ArrayBlockingQueue; 42 import java.util.concurrent.BlockingQueue; 43 import java.util.concurrent.atomic.AtomicBoolean; 44 45 /** 46 * Wraps a single Connector and forwards new messages 47 * to the supplied message dispatcher. This is used 48 * by DefaultClient to manage its connector objects. 49 * This is only responsible for message reading and provides 50 * no support for buffering writes. 51 * 52 * <p>This adapter assumes a simple protocol where two 53 * bytes define a (short) object size with the object data 54 * to follow. Note: this limits the size of serialized 55 * objects to 32676 bytes... even though, for example, 56 * datagram packets can hold twice that. :P</p> 57 * 58 * @version $Revision: 8944 $ 59 * @author Paul Speed 60 */ 61 public class ConnectorAdapter extends Thread 62 { 63 private static final int OUTBOUND_BACKLOG = 16000; 64 65 private Connector connector; 66 private MessageListener<Object> dispatcher; 67 private ErrorListener<Object> errorHandler; 68 private AtomicBoolean go = new AtomicBoolean(true); 69 70 private BlockingQueue<ByteBuffer> outbound; 71 72 // Writes messages out on a background thread 73 private WriterThread writer; 74 75 // Marks the messages as reliable or not if they came 76 // through this connector. 77 private boolean reliable; 78 79 public ConnectorAdapter( Connector connector, MessageListener<Object> dispatcher, 80 ErrorListener<Object> errorHandler, boolean reliable ) 81 { 82 super( String.valueOf(connector) ); 83 this.connector = connector; 84 this.dispatcher = dispatcher; 85 this.errorHandler = errorHandler; 86 this.reliable = reliable; 87 setDaemon(true); 88 89 // The backlog makes sure that the outbound channel blocks once 90 // a certain backlog level is reached. It is set high so that it 91 // is only reached in the worst cases... which are usually things like 92 // raw throughput tests. Technically, a saturated TCP channel could 93 // back up quite a bit if the buffers are full and the socket has 94 // stalled but 16,000 messages is still a big backlog. 95 outbound = new ArrayBlockingQueue<ByteBuffer>(OUTBOUND_BACKLOG); 96 97 // Note: this technically adds a potential deadlock case 98 // with the above code where there wasn't one before. For example, 99 // if a TCP outbound queue fills to capacity and a client sends 100 // in such a way that they block TCP message handling then if the HostedConnection 101 // on the server is similarly blocked then the TCP network buffers may 102 // all get full and no outbound messages move and we forever block 103 // on the queue. 104 // However, in practice this can't really happen... or at least it's 105 // the sign of other really bad things. 106 // First, currently the server-side outbound queues are all unbounded and 107 // so won't ever block the handling of messages if the outbound channel is full. 108 // Second, there would have to be a huge amount of data backlog for this 109 // to ever occur anyway. 110 // Third, it's a sign of a really poor architecture if 16,000 messages 111 // can go out in a way that blocks reads. 112 113 writer = new WriterThread(); 114 writer.start(); 115 } 116 117 public void close() 118 { 119 go.set(false); 120 121 // Kill the writer service 122 writer.shutdown(); 123 124 if( connector.isConnected() ) 125 { 126 // Kill the connector 127 connector.close(); 128 } 129 } 130 131 protected void dispatch( Message m ) 132 { 133 dispatcher.messageReceived( null, m ); 134 } 135 136 public void write( ByteBuffer data ) 137 { 138 try { 139 outbound.put( data ); 140 } catch( InterruptedException e ) { 141 throw new RuntimeException( "Interrupted while waiting for queue to drain", e ); 142 } 143 } 144 145 protected void handleError( Exception e ) 146 { 147 if( !go.get() ) 148 return; 149 150 errorHandler.handleError( this, e ); 151 } 152 153 public void run() 154 { 155 MessageProtocol protocol = new MessageProtocol(); 156 157 try { 158 while( go.get() ) { 159 ByteBuffer buffer = connector.read(); 160 if( buffer == null ) { 161 if( go.get() ) { 162 throw new ConnectorException( "Connector closed." ); 163 } else { 164 // Just dump out because a null buffer is expected 165 // from a closed/closing connector 166 break; 167 } 168 } 169 170 protocol.addBuffer( buffer ); 171 172 Message m = null; 173 while( (m = protocol.getMessage()) != null ) { 174 m.setReliable( reliable ); 175 dispatch( m ); 176 } 177 } 178 } catch( Exception e ) { 179 handleError( e ); 180 } 181 } 182 183 protected class WriterThread extends Thread 184 { 185 public WriterThread() 186 { 187 super( String.valueOf(connector) + "-writer" ); 188 } 189 190 public void shutdown() 191 { 192 interrupt(); 193 } 194 195 private void write( ByteBuffer data ) 196 { 197 try { 198 connector.write(data); 199 } catch( Exception e ) { 200 handleError( e ); 201 } 202 } 203 204 public void run() 205 { 206 while( go.get() ) { 207 try { 208 ByteBuffer data = outbound.take(); 209 write(data); 210 } catch( InterruptedException e ) { 211 if( !go.get() ) 212 return; 213 throw new RuntimeException( "Interrupted waiting for data", e ); 214 } 215 } 216 } 217 } 218 } 219