1 /* 2 * Copyright (c) 2011 jMonkeyEngine 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are 7 * met: 8 * 9 * * Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 12 * * Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * * Neither the name of 'jMonkeyEngine' nor the names of its contributors 17 * may be used to endorse or promote products derived from this software 18 * without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 22 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR 24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 27 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 28 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 29 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 30 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 31 */ 32 33 package com.jme3.network.kernel.tcp; 34 35 import com.jme3.network.Filter; 36 import com.jme3.network.kernel.*; 37 import java.io.IOException; 38 import java.net.InetAddress; 39 import java.net.InetSocketAddress; 40 import java.net.Socket; 41 import java.nio.ByteBuffer; 42 import java.nio.channels.*; 43 import java.nio.channels.spi.SelectorProvider; 44 import java.util.Iterator; 45 import java.util.Map; 46 import java.util.concurrent.ConcurrentHashMap; 47 import java.util.concurrent.atomic.AtomicBoolean; 48 import java.util.logging.Level; 49 import java.util.logging.Logger; 50 51 52 /** 53 * A Kernel implementation based on NIO selectors. 54 * 55 * @version $Revision: 8944 $ 56 * @author Paul Speed 57 */ 58 public class SelectorKernel extends AbstractKernel 59 { 60 static Logger log = Logger.getLogger(SelectorKernel.class.getName()); 61 62 private InetSocketAddress address; 63 private SelectorThread thread; 64 65 private Map<Long,NioEndpoint> endpoints = new ConcurrentHashMap<Long,NioEndpoint>(); 66 67 public SelectorKernel( InetAddress host, int port ) 68 { 69 this( new InetSocketAddress(host, port) ); 70 } 71 72 public SelectorKernel( int port ) throws IOException 73 { 74 this( new InetSocketAddress(port) ); 75 } 76 77 public SelectorKernel( InetSocketAddress address ) 78 { 79 this.address = address; 80 } 81 82 protected SelectorThread createSelectorThread() 83 { 84 return new SelectorThread(); 85 } 86 87 public void initialize() 88 { 89 if( thread != null ) 90 throw new IllegalStateException( "Kernel already initialized." ); 91 92 thread = createSelectorThread(); 93 94 try { 95 thread.connect(); 96 thread.start(); 97 } catch( IOException e ) { 98 throw new KernelException( "Error hosting:" + address, e ); 99 } 100 } 101 102 public void terminate() throws InterruptedException 103 { 104 if( thread == null ) 105 throw new IllegalStateException( "Kernel not initialized." ); 106 107 try { 108 thread.close(); 109 thread = null; 110 } catch( IOException e ) { 111 throw new KernelException( "Error closing host connection:" + address, e ); 112 } 113 } 114 115 public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable, 116 boolean copy ) 117 { 118 if( !reliable ) 119 throw new UnsupportedOperationException( "Unreliable send not supported by this kernel." ); 120 121 if( copy ) { 122 // Copy the data just once 123 byte[] temp = new byte[data.remaining()]; 124 System.arraycopy(data.array(), data.position(), temp, 0, data.remaining()); 125 data = ByteBuffer.wrap(temp); 126 } 127 128 // Hand it to all of the endpoints that match our routing 129 for( NioEndpoint p : endpoints.values() ) { 130 // Does it match the filter? 131 if( filter != null && !filter.apply(p) ) 132 continue; 133 134 // Give it the data... but let each endpoint track their 135 // own completion over the shared array of bytes by 136 // duplicating it 137 p.send( data.duplicate(), false, false ); 138 } 139 140 // Wake up the selector so it can reinitialize its 141 // state accordingly. 142 wakeupSelector(); 143 } 144 145 protected NioEndpoint addEndpoint( SocketChannel c ) 146 { 147 // Note: we purposely do NOT put the key in the endpoint. 148 // SelectionKeys are dangerous outside the selector thread 149 // and this is safer. 150 NioEndpoint p = new NioEndpoint( this, nextEndpointId(), c ); 151 152 endpoints.put( p.getId(), p ); 153 154 // Enqueue an endpoint event for the listeners 155 addEvent( EndpointEvent.createAdd( this, p ) ); 156 157 return p; 158 } 159 160 protected void removeEndpoint( NioEndpoint p, SocketChannel c ) 161 { 162 endpoints.remove( p.getId() ); 163 log.log( Level.FINE, "Endpoints size:{0}", endpoints.size() ); 164 165 // Enqueue an endpoint event for the listeners 166 addEvent( EndpointEvent.createRemove( this, p ) ); 167 168 // If there are no pending messages then add one so that the 169 // kernel-user knows to wake up if it is only listening for 170 // envelopes. 171 if( !hasEnvelopes() ) { 172 // Note: this is not really a race condition. At worst, our 173 // event has already been handled by now and it does no harm 174 // to check again. 175 addEnvelope( EVENTS_PENDING ); 176 } 177 } 178 179 /** 180 * Called by the endpoints when they need to be closed. 181 */ 182 protected void closeEndpoint( NioEndpoint p ) throws IOException 183 { 184 //log.log( Level.INFO, "Closing endpoint:{0}.", p ); 185 186 thread.cancel(p); 187 } 188 189 /** 190 * Used internally by the endpoints to wakeup the selector 191 * when they have data to send. 192 */ 193 protected void wakeupSelector() 194 { 195 thread.wakeupSelector(); 196 } 197 198 protected void newData( NioEndpoint p, SocketChannel c, ByteBuffer shared, int size ) 199 { 200 // Note: if ever desirable, it would be possible to accumulate 201 // data per source channel and only 'finalize' it when 202 // asked for more envelopes then were ready. I just don't 203 // think it will be an issue in practice. The busier the 204 // server, the more the buffers will fill before we get to them. 205 // And if the server isn't busy, who cares if we chop things up 206 // smaller... the network is still likely to deliver things in 207 // bulk anyway. 208 209 // Must copy the shared data before we use it 210 byte[] dataCopy = new byte[size]; 211 System.arraycopy(shared.array(), 0, dataCopy, 0, size); 212 213 Envelope env = new Envelope( p, dataCopy, true ); 214 addEnvelope( env ); 215 } 216 217 /** 218 * This class is purposely tucked neatly away because 219 * messing with the selector from other threads for any 220 * reason is very bad. This is the safest architecture. 221 */ 222 protected class SelectorThread extends Thread 223 { 224 private ServerSocketChannel serverChannel; 225 private Selector selector; 226 private AtomicBoolean go = new AtomicBoolean(true); 227 private ByteBuffer working = ByteBuffer.allocate( 8192 ); 228 229 /** 230 * Because we want to keep the keys to ourselves, we'll do 231 * the endpoint -> key mapping internally. 232 */ 233 private Map<NioEndpoint,SelectionKey> endpointKeys = new ConcurrentHashMap<NioEndpoint,SelectionKey>(); 234 235 public SelectorThread() 236 { 237 setName( "Selector@" + address ); 238 setDaemon(true); 239 } 240 241 public void connect() throws IOException 242 { 243 // Create a new selector 244 this.selector = SelectorProvider.provider().openSelector(); 245 246 // Create a new non-blocking server socket channel 247 this.serverChannel = ServerSocketChannel.open(); 248 serverChannel.configureBlocking(false); 249 250 // Bind the server socket to the specified address and port 251 serverChannel.socket().bind(address); 252 253 // Register the server socket channel, indicating an interest in 254 // accepting new connections 255 serverChannel.register(selector, SelectionKey.OP_ACCEPT); 256 257 log.log( Level.INFO, "Hosting TCP connection:{0}.", address ); 258 } 259 260 public void close() throws IOException, InterruptedException 261 { 262 // Set the thread to stop 263 go.set(false); 264 265 // Make sure the channel is closed 266 serverChannel.close(); 267 268 // Force the selector to stop blocking 269 wakeupSelector(); 270 271 // And wait for it 272 join(); 273 } 274 275 protected void wakeupSelector() 276 { 277 selector.wakeup(); 278 } 279 280 protected void setupSelectorOptions() 281 { 282 // For now, selection keys will either be in OP_READ 283 // or OP_WRITE. So while we are writing a buffer, we 284 // will not be reading. This is way simpler and less 285 // error prone... it can always be changed when everything 286 // else works if we are looking to micro-optimize. 287 288 // Setup options based on the current state of 289 // the endpoints. This could potentially be more 290 // efficiently done as change requests... or simply 291 // keeping a thread-safe set of endpoints with pending 292 // writes. For most cases, it shouldn't matter. 293 for( Map.Entry<NioEndpoint,SelectionKey> e : endpointKeys.entrySet() ) { 294 if( e.getKey().hasPending() ) { 295 e.getValue().interestOps(SelectionKey.OP_WRITE); 296 } 297 } 298 } 299 300 protected void accept( SelectionKey key ) throws IOException 301 { 302 // Would only get accepts on a server channel 303 ServerSocketChannel serverChan = (ServerSocketChannel)key.channel(); 304 305 // Setup the connection to be non-blocking 306 SocketChannel remoteChan = serverChan.accept(); 307 remoteChan.configureBlocking(false); 308 309 // And disable Nagle's buffering algorithm... we want 310 // data to go when we put it there. 311 Socket sock = remoteChan.socket(); 312 sock.setTcpNoDelay(true); 313 314 // Let the selector know we're interested in reading 315 // data from the channel 316 SelectionKey endKey = remoteChan.register( selector, SelectionKey.OP_READ ); 317 318 // And now create a new endpoint 319 NioEndpoint p = addEndpoint( remoteChan ); 320 endKey.attach(p); 321 endpointKeys.put(p, endKey); 322 } 323 324 protected void cancel( NioEndpoint p ) throws IOException 325 { 326 SelectionKey key = endpointKeys.remove(p); 327 if( key == null ) { 328 //log.log( Level.INFO, "Endpoint already closed:{0}.", p ); 329 return; // already closed it 330 } 331 log.log( Level.FINE, "Endpoint keys size:{0}", endpointKeys.size() ); 332 333 log.log( Level.INFO, "Closing endpoint:{0}.", p ); 334 SocketChannel c = (SocketChannel)key.channel(); 335 336 // Note: key.cancel() is specifically thread safe. One of 337 // the few things one can do with a key from another 338 // thread. 339 key.cancel(); 340 c.close(); 341 removeEndpoint( p, c ); 342 } 343 344 protected void cancel( SelectionKey key, SocketChannel c ) throws IOException 345 { 346 NioEndpoint p = (NioEndpoint)key.attachment(); 347 log.log( Level.INFO, "Closing channel endpoint:{0}.", p ); 348 Object o = endpointKeys.remove(p); 349 350 log.log( Level.FINE, "Endpoint keys size:{0}", endpointKeys.size() ); 351 352 key.cancel(); 353 c.close(); 354 removeEndpoint( p, c ); 355 } 356 357 protected void read( SelectionKey key ) throws IOException 358 { 359 NioEndpoint p = (NioEndpoint)key.attachment(); 360 SocketChannel c = (SocketChannel)key.channel(); 361 working.clear(); 362 363 int size; 364 try { 365 size = c.read(working); 366 } catch( IOException e ) { 367 // The remove end forcibly closed the connection... 368 // close out our end and cancel the key 369 cancel( key, c ); 370 return; 371 } 372 373 if( size == -1 ) { 374 // The remote end shut down cleanly... 375 // close out our end and cancel the key 376 cancel( key, c ); 377 return; 378 } 379 380 newData( p, c, working, size ); 381 } 382 383 protected void write( SelectionKey key ) throws IOException 384 { 385 NioEndpoint p = (NioEndpoint)key.attachment(); 386 SocketChannel c = (SocketChannel)key.channel(); 387 388 // We will send what we can and move on. 389 ByteBuffer current = p.peekPending(); 390 if( current == NioEndpoint.CLOSE_MARKER ) { 391 // This connection wants to be closed now 392 closeEndpoint(p); 393 394 // Nothing more to do 395 return; 396 } 397 398 c.write( current ); 399 400 // If we wrote all of that packet then we need to remove it 401 if( current.remaining() == 0 ) { 402 p.removePending(); 403 } 404 405 // If we happened to empty the pending queue then let's read 406 // again. 407 if( !p.hasPending() ) { 408 key.interestOps( SelectionKey.OP_READ ); 409 } 410 } 411 412 protected void select() throws IOException 413 { 414 selector.select(); 415 416 for( Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext(); ) { 417 SelectionKey key = i.next(); 418 i.remove(); 419 420 if( !key.isValid() ) 421 { 422 // When does this happen? 423 log.log( Level.INFO, "Key is not valid:{0}.", key ); 424 continue; 425 } 426 427 try { 428 if( key.isAcceptable() ) 429 accept(key); 430 else if( key.isWritable() ) 431 write(key); 432 else if( key.isReadable() ) 433 read(key); 434 } catch( IOException e ) { 435 if( !go.get() ) 436 return; // error likely due to shutting down 437 reportError( e ); 438 439 // And at this level, errors likely mean the key is now 440 // dead and it doesn't hurt to kick them anyway. If we 441 // find IOExceptions that are not fatal, this can be 442 // readdressed 443 cancel( key, (SocketChannel)key.channel() ); 444 } 445 } 446 } 447 448 public void run() 449 { 450 log.log( Level.INFO, "Kernel started for connection:{0}.", address ); 451 452 // An atomic is safest and costs almost nothing 453 while( go.get() ) { 454 // Setup any queued option changes 455 setupSelectorOptions(); 456 457 // Check for available keys and process them 458 try { 459 select(); 460 } catch( ClosedSelectorException e ) { 461 if( !go.get() ) 462 return; // it's because we're shutting down 463 throw new KernelException( "Premature selector closing", e ); 464 } catch( IOException e ) { 465 if( !go.get() ) 466 return; // error likely due to shutting down 467 reportError( e ); 468 } 469 } 470 } 471 } 472 } 473