Home | History | Annotate | Download | only in tcp
      1 /*
      2  * Copyright (c) 2011 jMonkeyEngine
      3  * All rights reserved.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are
      7  * met:
      8  *
      9  * * Redistributions of source code must retain the above copyright
     10  *   notice, this list of conditions and the following disclaimer.
     11  *
     12  * * Redistributions in binary form must reproduce the above copyright
     13  *   notice, this list of conditions and the following disclaimer in the
     14  *   documentation and/or other materials provided with the distribution.
     15  *
     16  * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
     17  *   may be used to endorse or promote products derived from this software
     18  *   without specific prior written permission.
     19  *
     20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
     22  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
     23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
     24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
     25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
     27  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
     28  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
     29  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
     30  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     31  */
     32 
     33 package com.jme3.network.kernel.tcp;
     34 
     35 import com.jme3.network.Filter;
     36 import com.jme3.network.kernel.*;
     37 import java.io.IOException;
     38 import java.net.InetAddress;
     39 import java.net.InetSocketAddress;
     40 import java.net.Socket;
     41 import java.nio.ByteBuffer;
     42 import java.nio.channels.*;
     43 import java.nio.channels.spi.SelectorProvider;
     44 import java.util.Iterator;
     45 import java.util.Map;
     46 import java.util.concurrent.ConcurrentHashMap;
     47 import java.util.concurrent.atomic.AtomicBoolean;
     48 import java.util.logging.Level;
     49 import java.util.logging.Logger;
     50 
     51 
     52 /**
     53  *  A Kernel implementation based on NIO selectors.
     54  *
     55  *  @version   $Revision: 8944 $
     56  *  @author    Paul Speed
     57  */
     58 public class SelectorKernel extends AbstractKernel
     59 {
     60     static Logger log = Logger.getLogger(SelectorKernel.class.getName());
     61 
     62     private InetSocketAddress address;
     63     private SelectorThread thread;
     64 
     65     private Map<Long,NioEndpoint> endpoints = new ConcurrentHashMap<Long,NioEndpoint>();
     66 
     67     public SelectorKernel( InetAddress host, int port )
     68     {
     69         this( new InetSocketAddress(host, port) );
     70     }
     71 
     72     public SelectorKernel( int port ) throws IOException
     73     {
     74         this( new InetSocketAddress(port) );
     75     }
     76 
     77     public SelectorKernel( InetSocketAddress address )
     78     {
     79         this.address = address;
     80     }
     81 
     82     protected SelectorThread createSelectorThread()
     83     {
     84         return new SelectorThread();
     85     }
     86 
     87     public void initialize()
     88     {
     89         if( thread != null )
     90             throw new IllegalStateException( "Kernel already initialized." );
     91 
     92         thread = createSelectorThread();
     93 
     94         try {
     95             thread.connect();
     96             thread.start();
     97         } catch( IOException e ) {
     98             throw new KernelException( "Error hosting:" + address, e );
     99         }
    100     }
    101 
    102     public void terminate() throws InterruptedException
    103     {
    104         if( thread == null )
    105             throw new IllegalStateException( "Kernel not initialized." );
    106 
    107         try {
    108             thread.close();
    109             thread = null;
    110         } catch( IOException e ) {
    111             throw new KernelException( "Error closing host connection:" + address, e );
    112         }
    113     }
    114 
    115     public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable,
    116                            boolean copy )
    117     {
    118         if( !reliable )
    119             throw new UnsupportedOperationException( "Unreliable send not supported by this kernel." );
    120 
    121         if( copy ) {
    122             // Copy the data just once
    123             byte[] temp = new byte[data.remaining()];
    124             System.arraycopy(data.array(), data.position(), temp, 0, data.remaining());
    125             data = ByteBuffer.wrap(temp);
    126         }
    127 
    128         // Hand it to all of the endpoints that match our routing
    129         for( NioEndpoint p : endpoints.values() ) {
    130             // Does it match the filter?
    131             if( filter != null && !filter.apply(p) )
    132                 continue;
    133 
    134             // Give it the data... but let each endpoint track their
    135             // own completion over the shared array of bytes by
    136             // duplicating it
    137             p.send( data.duplicate(), false, false );
    138         }
    139 
    140         // Wake up the selector so it can reinitialize its
    141         // state accordingly.
    142         wakeupSelector();
    143     }
    144 
    145     protected NioEndpoint addEndpoint( SocketChannel c )
    146     {
    147         // Note: we purposely do NOT put the key in the endpoint.
    148         //       SelectionKeys are dangerous outside the selector thread
    149         //       and this is safer.
    150         NioEndpoint p = new NioEndpoint( this, nextEndpointId(), c );
    151 
    152         endpoints.put( p.getId(), p );
    153 
    154         // Enqueue an endpoint event for the listeners
    155         addEvent( EndpointEvent.createAdd( this, p ) );
    156 
    157         return p;
    158     }
    159 
    160     protected void removeEndpoint( NioEndpoint p, SocketChannel c )
    161     {
    162         endpoints.remove( p.getId() );
    163         log.log( Level.FINE, "Endpoints size:{0}", endpoints.size() );
    164 
    165         // Enqueue an endpoint event for the listeners
    166         addEvent( EndpointEvent.createRemove( this, p ) );
    167 
    168         // If there are no pending messages then add one so that the
    169         // kernel-user knows to wake up if it is only listening for
    170         // envelopes.
    171         if( !hasEnvelopes() ) {
    172             // Note: this is not really a race condition.  At worst, our
    173             // event has already been handled by now and it does no harm
    174             // to check again.
    175             addEnvelope( EVENTS_PENDING );
    176         }
    177     }
    178 
    179     /**
    180      *  Called by the endpoints when they need to be closed.
    181      */
    182     protected void closeEndpoint( NioEndpoint p ) throws IOException
    183     {
    184         //log.log( Level.INFO, "Closing endpoint:{0}.", p );
    185 
    186         thread.cancel(p);
    187     }
    188 
    189     /**
    190      *  Used internally by the endpoints to wakeup the selector
    191      *  when they have data to send.
    192      */
    193     protected void wakeupSelector()
    194     {
    195         thread.wakeupSelector();
    196     }
    197 
    198     protected void newData( NioEndpoint p, SocketChannel c, ByteBuffer shared, int size )
    199     {
    200         // Note: if ever desirable, it would be possible to accumulate
    201         //       data per source channel and only 'finalize' it when
    202         //       asked for more envelopes then were ready.  I just don't
    203         //       think it will be an issue in practice.  The busier the
    204         //       server, the more the buffers will fill before we get to them.
    205         //       And if the server isn't busy, who cares if we chop things up
    206         //       smaller... the network is still likely to deliver things in
    207         //       bulk anyway.
    208 
    209         // Must copy the shared data before we use it
    210         byte[] dataCopy = new byte[size];
    211 		System.arraycopy(shared.array(), 0, dataCopy, 0, size);
    212 
    213         Envelope env = new Envelope( p, dataCopy, true );
    214         addEnvelope( env );
    215     }
    216 
    217     /**
    218      *  This class is purposely tucked neatly away because
    219      *  messing with the selector from other threads for any
    220      *  reason is very bad.  This is the safest architecture.
    221      */
    222     protected class SelectorThread extends Thread
    223     {
    224         private ServerSocketChannel serverChannel;
    225         private Selector selector;
    226         private AtomicBoolean go = new AtomicBoolean(true);
    227         private ByteBuffer working = ByteBuffer.allocate( 8192 );
    228 
    229         /**
    230          *  Because we want to keep the keys to ourselves, we'll do
    231          *  the endpoint -> key mapping internally.
    232          */
    233         private Map<NioEndpoint,SelectionKey> endpointKeys = new ConcurrentHashMap<NioEndpoint,SelectionKey>();
    234 
    235         public SelectorThread()
    236         {
    237             setName( "Selector@" + address );
    238             setDaemon(true);
    239         }
    240 
    241         public void connect() throws IOException
    242         {
    243             // Create a new selector
    244             this.selector = SelectorProvider.provider().openSelector();
    245 
    246             // Create a new non-blocking server socket channel
    247             this.serverChannel = ServerSocketChannel.open();
    248             serverChannel.configureBlocking(false);
    249 
    250             // Bind the server socket to the specified address and port
    251             serverChannel.socket().bind(address);
    252 
    253             // Register the server socket channel, indicating an interest in
    254             // accepting new connections
    255             serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    256 
    257             log.log( Level.INFO, "Hosting TCP connection:{0}.", address );
    258         }
    259 
    260         public void close() throws IOException, InterruptedException
    261         {
    262             // Set the thread to stop
    263             go.set(false);
    264 
    265             // Make sure the channel is closed
    266             serverChannel.close();
    267 
    268             // Force the selector to stop blocking
    269             wakeupSelector();
    270 
    271             // And wait for it
    272             join();
    273         }
    274 
    275         protected void wakeupSelector()
    276         {
    277             selector.wakeup();
    278         }
    279 
    280         protected void setupSelectorOptions()
    281         {
    282             // For now, selection keys will either be in OP_READ
    283             // or OP_WRITE.  So while we are writing a buffer, we
    284             // will not be reading.  This is way simpler and less
    285             // error prone... it can always be changed when everything
    286             // else works if we are looking to micro-optimize.
    287 
    288             // Setup options based on the current state of
    289             // the endpoints.  This could potentially be more
    290             // efficiently done as change requests... or simply
    291             // keeping a thread-safe set of endpoints with pending
    292             // writes.  For most cases, it shouldn't matter.
    293             for( Map.Entry<NioEndpoint,SelectionKey> e : endpointKeys.entrySet() ) {
    294                 if( e.getKey().hasPending() ) {
    295                     e.getValue().interestOps(SelectionKey.OP_WRITE);
    296                 }
    297             }
    298         }
    299 
    300         protected void accept( SelectionKey key ) throws IOException
    301         {
    302             // Would only get accepts on a server channel
    303             ServerSocketChannel serverChan = (ServerSocketChannel)key.channel();
    304 
    305             // Setup the connection to be non-blocking
    306             SocketChannel remoteChan = serverChan.accept();
    307             remoteChan.configureBlocking(false);
    308 
    309             // And disable Nagle's buffering algorithm... we want
    310             // data to go when we put it there.
    311             Socket sock = remoteChan.socket();
    312             sock.setTcpNoDelay(true);
    313 
    314             // Let the selector know we're interested in reading
    315             // data from the channel
    316             SelectionKey endKey = remoteChan.register( selector, SelectionKey.OP_READ );
    317 
    318             // And now create a new endpoint
    319             NioEndpoint p = addEndpoint( remoteChan );
    320             endKey.attach(p);
    321             endpointKeys.put(p, endKey);
    322         }
    323 
    324         protected void cancel( NioEndpoint p ) throws IOException
    325         {
    326             SelectionKey key = endpointKeys.remove(p);
    327             if( key == null ) {
    328                 //log.log( Level.INFO, "Endpoint already closed:{0}.", p );
    329                 return;  // already closed it
    330             }
    331             log.log( Level.FINE, "Endpoint keys size:{0}", endpointKeys.size() );
    332 
    333             log.log( Level.INFO, "Closing endpoint:{0}.", p );
    334             SocketChannel c = (SocketChannel)key.channel();
    335 
    336             // Note: key.cancel() is specifically thread safe.  One of
    337             //       the few things one can do with a key from another
    338             //       thread.
    339             key.cancel();
    340             c.close();
    341             removeEndpoint( p, c );
    342         }
    343 
    344         protected void cancel( SelectionKey key, SocketChannel c ) throws IOException
    345         {
    346             NioEndpoint p = (NioEndpoint)key.attachment();
    347             log.log( Level.INFO, "Closing channel endpoint:{0}.", p );
    348             Object o = endpointKeys.remove(p);
    349 
    350             log.log( Level.FINE, "Endpoint keys size:{0}", endpointKeys.size() );
    351 
    352             key.cancel();
    353             c.close();
    354             removeEndpoint( p, c );
    355         }
    356 
    357         protected void read( SelectionKey key ) throws IOException
    358         {
    359             NioEndpoint p = (NioEndpoint)key.attachment();
    360             SocketChannel c = (SocketChannel)key.channel();
    361             working.clear();
    362 
    363             int size;
    364             try {
    365                 size = c.read(working);
    366             } catch( IOException e ) {
    367                 // The remove end forcibly closed the connection...
    368                 // close out our end and cancel the key
    369                 cancel( key, c );
    370                 return;
    371             }
    372 
    373             if( size == -1 ) {
    374                 // The remote end shut down cleanly...
    375                 // close out our end and cancel the key
    376                 cancel( key, c );
    377                 return;
    378             }
    379 
    380             newData( p, c, working, size );
    381         }
    382 
    383         protected void write( SelectionKey key ) throws IOException
    384         {
    385             NioEndpoint p = (NioEndpoint)key.attachment();
    386             SocketChannel c = (SocketChannel)key.channel();
    387 
    388             // We will send what we can and move on.
    389             ByteBuffer current = p.peekPending();
    390             if( current == NioEndpoint.CLOSE_MARKER ) {
    391                 // This connection wants to be closed now
    392                 closeEndpoint(p);
    393 
    394                 // Nothing more to do
    395                 return;
    396             }
    397 
    398             c.write( current );
    399 
    400             // If we wrote all of that packet then we need to remove it
    401             if( current.remaining() == 0 ) {
    402                 p.removePending();
    403             }
    404 
    405             // If we happened to empty the pending queue then let's read
    406             // again.
    407             if( !p.hasPending() ) {
    408                 key.interestOps( SelectionKey.OP_READ );
    409             }
    410         }
    411 
    412         protected void select() throws IOException
    413         {
    414             selector.select();
    415 
    416             for( Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext(); ) {
    417                 SelectionKey key = i.next();
    418                 i.remove();
    419 
    420                 if( !key.isValid() )
    421                     {
    422                     // When does this happen?
    423                     log.log( Level.INFO, "Key is not valid:{0}.", key );
    424                     continue;
    425                     }
    426 
    427                 try {
    428                     if( key.isAcceptable() )
    429                         accept(key);
    430                     else if( key.isWritable() )
    431                         write(key);
    432                     else if( key.isReadable() )
    433                         read(key);
    434                 } catch( IOException e ) {
    435                     if( !go.get() )
    436                         return;  // error likely due to shutting down
    437                     reportError( e );
    438 
    439                     // And at this level, errors likely mean the key is now
    440                     // dead and it doesn't hurt to kick them anyway.  If we
    441                     // find IOExceptions that are not fatal, this can be
    442                     // readdressed
    443                     cancel( key, (SocketChannel)key.channel() );
    444                 }
    445             }
    446         }
    447 
    448         public void run()
    449         {
    450             log.log( Level.INFO, "Kernel started for connection:{0}.", address );
    451 
    452             // An atomic is safest and costs almost nothing
    453             while( go.get() ) {
    454                 // Setup any queued option changes
    455                 setupSelectorOptions();
    456 
    457                 // Check for available keys and process them
    458                 try {
    459                     select();
    460                 } catch( ClosedSelectorException e ) {
    461                     if( !go.get() )
    462                         return;  // it's because we're shutting down
    463                     throw new KernelException( "Premature selector closing", e );
    464                 } catch( IOException e ) {
    465                     if( !go.get() )
    466                         return;  // error likely due to shutting down
    467                     reportError( e );
    468                 }
    469             }
    470         }
    471     }
    472 }
    473