1 /* 2 * Copyright (c) 2011 jMonkeyEngine 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are 7 * met: 8 * 9 * * Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 12 * * Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * * Neither the name of 'jMonkeyEngine' nor the names of its contributors 17 * may be used to endorse or promote products derived from this software 18 * without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 22 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR 24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 27 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 28 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 29 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 30 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 31 */ 32 33 package com.jme3.network.base; 34 35 import com.jme3.network.Filter; 36 import com.jme3.network.HostedConnection; 37 import com.jme3.network.Message; 38 import com.jme3.network.MessageListener; 39 import com.jme3.network.kernel.Endpoint; 40 import com.jme3.network.kernel.EndpointEvent; 41 import com.jme3.network.kernel.Envelope; 42 import com.jme3.network.kernel.Kernel; 43 import com.jme3.network.message.ClientRegistrationMessage; 44 import java.nio.ByteBuffer; 45 import java.util.Map; 46 import java.util.concurrent.ConcurrentHashMap; 47 import java.util.concurrent.atomic.AtomicBoolean; 48 import java.util.logging.Level; 49 import java.util.logging.Logger; 50 51 /** 52 * Wraps a single Kernel and forwards new messages 53 * to the supplied message dispatcher and new endpoint 54 * events to the connection dispatcher. This is used 55 * by DefaultServer to manage its kernel objects. 56 * 57 * <p>This adapter assumes a simple protocol where two 58 * bytes define a (short) object size with the object data 59 * to follow. Note: this limits the size of serialized 60 * objects to 32676 bytes... even though, for example, 61 * datagram packets can hold twice that. :P</p> 62 * 63 * @version $Revision: 8944 $ 64 * @author Paul Speed 65 */ 66 public class KernelAdapter extends Thread 67 { 68 static Logger log = Logger.getLogger(KernelAdapter.class.getName()); 69 70 private DefaultServer server; // this is unfortunate 71 private Kernel kernel; 72 private MessageListener<HostedConnection> messageDispatcher; 73 private AtomicBoolean go = new AtomicBoolean(true); 74 75 // Keeps track of the in-progress messages that are received 76 // on reliable connections 77 private Map<Endpoint, MessageProtocol> messageBuffers = new ConcurrentHashMap<Endpoint,MessageProtocol>(); 78 79 // Marks the messages as reliable or not if they came 80 // through this connector. 81 private boolean reliable; 82 83 public KernelAdapter( DefaultServer server, Kernel kernel, MessageListener<HostedConnection> messageDispatcher, 84 boolean reliable ) 85 { 86 super( String.valueOf(kernel) ); 87 this.server = server; 88 this.kernel = kernel; 89 this.messageDispatcher = messageDispatcher; 90 this.reliable = reliable; 91 setDaemon(true); 92 } 93 94 public Kernel getKernel() 95 { 96 return kernel; 97 } 98 99 public void initialize() 100 { 101 kernel.initialize(); 102 } 103 104 public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable, 105 boolean copy ) 106 { 107 kernel.broadcast( filter, data, reliable, copy ); 108 } 109 110 public void close() throws InterruptedException 111 { 112 go.set(false); 113 114 // Kill the kernel 115 kernel.terminate(); 116 } 117 118 protected void reportError( Endpoint p, Object context, Exception e ) 119 { 120 // Should really be queued up so the outer thread can 121 // retrieve them. For now we'll just log it. FIXME 122 log.log( Level.SEVERE, "Unhandled error, endpoint:" + p + ", context:" + context, e ); 123 124 // In lieu of other options, at least close the endpoint 125 p.close(); 126 } 127 128 protected HostedConnection getConnection( Endpoint p ) 129 { 130 return server.getConnection(p); 131 } 132 133 protected void connectionClosed( Endpoint p ) 134 { 135 // Remove any message buffer we've been accumulating 136 // on behalf of this endpoing 137 messageBuffers.remove(p); 138 139 log.log( Level.FINE, "Buffers size:{0}", messageBuffers.size() ); 140 141 server.connectionClosed(p); 142 } 143 144 /** 145 * Note on threading for those writing their own server 146 * or adapter implementations. The rule that a single connection be 147 * processed by only one thread at a time is more about ensuring that 148 * the messages are delivered in the order that they are received 149 * than for any user-code safety. 99% of the time the user code should 150 * be writing for multithreaded access anyway. 151 * 152 * <p>The issue with the messages is that if a an implementation is 153 * using a general thread pool then it would be possible for a 154 * naive implementation to have one thread grab an Envelope from 155 * connection 1's and another grab the next Envelope. Since an Envelope 156 * may contain several messages, delivering the second thread's messages 157 * before or during the first's would be really confusing and hard 158 * to code for in user code.</p> 159 * 160 * <p>And that's why this note is here. DefaultServer does a rudimentary 161 * per-connection locking but it couldn't possibly guard against 162 * out of order Envelope processing.</p> 163 */ 164 protected void dispatch( Endpoint p, Message m ) 165 { 166 // Because this class is the only one with the information 167 // to do it... we need to pull of the registration message 168 // here. 169 if( m instanceof ClientRegistrationMessage ) { 170 server.registerClient( this, p, (ClientRegistrationMessage)m ); 171 return; 172 } 173 174 try { 175 HostedConnection source = getConnection(p); 176 if( source == null ) { 177 if( reliable ) { 178 // If it's a reliable connection then it's slightly more 179 // concerning but this can happen all the time for a UDP endpoint. 180 log.log( Level.WARNING, "Recieved message from unconnected endpoint:" + p + " message:" + m ); 181 } 182 return; 183 } 184 messageDispatcher.messageReceived( source, m ); 185 } catch( Exception e ) { 186 reportError(p, m, e); 187 } 188 } 189 190 protected MessageProtocol getMessageBuffer( Endpoint p ) 191 { 192 if( !reliable ) { 193 // Since UDP comes in packets and they aren't split 194 // up, there is no reason to buffer. In fact, there would 195 // be a down side because there is no way for us to reliably 196 // clean these up later since we'd create another one for 197 // any random UDP packet that comes to the port. 198 return new MessageProtocol(); 199 } else { 200 // See if we already have one 201 MessageProtocol result = messageBuffers.get(p); 202 if( result == null ) { 203 result = new MessageProtocol(); 204 messageBuffers.put(p, result); 205 } 206 return result; 207 } 208 } 209 210 protected void createAndDispatch( Envelope env ) 211 { 212 MessageProtocol protocol = getMessageBuffer(env.getSource()); 213 214 byte[] data = env.getData(); 215 ByteBuffer buffer = ByteBuffer.wrap(data); 216 217 int count = protocol.addBuffer( buffer ); 218 if( count == 0 ) { 219 // This can happen if there was only a partial message 220 // received. However, this should never happen for unreliable 221 // connections. 222 if( !reliable ) { 223 // Log some additional information about the packet. 224 int len = Math.min( 10, data.length ); 225 StringBuilder sb = new StringBuilder(); 226 for( int i = 0; i < len; i++ ) { 227 sb.append( "[" + Integer.toHexString(data[i]) + "]" ); 228 } 229 log.log( Level.INFO, "First 10 bytes of incomplete nessage:" + sb ); 230 throw new RuntimeException( "Envelope contained incomplete data:" + env ); 231 } 232 } 233 234 // Should be complete... and maybe we should check but we don't 235 Message m = null; 236 while( (m = protocol.getMessage()) != null ) { 237 m.setReliable(reliable); 238 dispatch( env.getSource(), m ); 239 } 240 } 241 242 protected void createAndDispatch( EndpointEvent event ) 243 { 244 // Only need to tell the server about disconnects 245 if( event.getType() == EndpointEvent.Type.REMOVE ) { 246 connectionClosed( event.getEndpoint() ); 247 } 248 } 249 250 protected void flushEvents() 251 { 252 EndpointEvent event; 253 while( (event = kernel.nextEvent()) != null ) { 254 try { 255 createAndDispatch( event ); 256 } catch( Exception e ) { 257 reportError(event.getEndpoint(), event, e); 258 } 259 } 260 } 261 262 public void run() 263 { 264 while( go.get() ) { 265 266 try { 267 // Check for pending events 268 flushEvents(); 269 270 // Grab the next envelope 271 Envelope e = kernel.read(); 272 if( e == Kernel.EVENTS_PENDING ) 273 continue; // We'll catch it up above 274 275 // Check for pending events that might have 276 // come in while we were blocking. This is usually 277 // when the connection add events come through 278 flushEvents(); 279 280 try { 281 createAndDispatch( e ); 282 } catch( Exception ex ) { 283 reportError(e.getSource(), e, ex); 284 } 285 286 } catch( InterruptedException ex ) { 287 if( !go.get() ) 288 return; 289 throw new RuntimeException( "Unexpected interruption", ex ); 290 } 291 } 292 } 293 294 } 295 296 297