Home | History | Annotate | Download | only in base
      1 /*
      2  * Copyright (c) 2011 jMonkeyEngine
      3  * All rights reserved.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are
      7  * met:
      8  *
      9  * * Redistributions of source code must retain the above copyright
     10  *   notice, this list of conditions and the following disclaimer.
     11  *
     12  * * Redistributions in binary form must reproduce the above copyright
     13  *   notice, this list of conditions and the following disclaimer in the
     14  *   documentation and/or other materials provided with the distribution.
     15  *
     16  * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
     17  *   may be used to endorse or promote products derived from this software
     18  *   without specific prior written permission.
     19  *
     20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
     22  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
     23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
     24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
     25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
     27  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
     28  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
     29  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
     30  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     31  */
     32 
     33 package com.jme3.network.base;
     34 
     35 import com.jme3.network.Filter;
     36 import com.jme3.network.HostedConnection;
     37 import com.jme3.network.Message;
     38 import com.jme3.network.MessageListener;
     39 import com.jme3.network.kernel.Endpoint;
     40 import com.jme3.network.kernel.EndpointEvent;
     41 import com.jme3.network.kernel.Envelope;
     42 import com.jme3.network.kernel.Kernel;
     43 import com.jme3.network.message.ClientRegistrationMessage;
     44 import java.nio.ByteBuffer;
     45 import java.util.Map;
     46 import java.util.concurrent.ConcurrentHashMap;
     47 import java.util.concurrent.atomic.AtomicBoolean;
     48 import java.util.logging.Level;
     49 import java.util.logging.Logger;
     50 
     51 /**
     52  *  Wraps a single Kernel and forwards new messages
     53  *  to the supplied message dispatcher and new endpoint
     54  *  events to the connection dispatcher.  This is used
     55  *  by DefaultServer to manage its kernel objects.
     56  *
     57  *  <p>This adapter assumes a simple protocol where two
     58  *  bytes define a (short) object size with the object data
     59  *  to follow.  Note: this limits the size of serialized
     60  *  objects to 32676 bytes... even though, for example,
     61  *  datagram packets can hold twice that. :P</p>
     62  *
     63  *  @version   $Revision: 8944 $
     64  *  @author    Paul Speed
     65  */
     66 public class KernelAdapter extends Thread
     67 {
     68     static Logger log = Logger.getLogger(KernelAdapter.class.getName());
     69 
     70     private DefaultServer server; // this is unfortunate
     71     private Kernel kernel;
     72     private MessageListener<HostedConnection> messageDispatcher;
     73     private AtomicBoolean go = new AtomicBoolean(true);
     74 
     75     // Keeps track of the in-progress messages that are received
     76     // on reliable connections
     77     private Map<Endpoint, MessageProtocol> messageBuffers = new ConcurrentHashMap<Endpoint,MessageProtocol>();
     78 
     79     // Marks the messages as reliable or not if they came
     80     // through this connector.
     81     private boolean reliable;
     82 
     83     public KernelAdapter( DefaultServer server, Kernel kernel, MessageListener<HostedConnection> messageDispatcher,
     84                           boolean reliable )
     85     {
     86         super( String.valueOf(kernel) );
     87         this.server = server;
     88         this.kernel = kernel;
     89         this.messageDispatcher = messageDispatcher;
     90         this.reliable = reliable;
     91         setDaemon(true);
     92     }
     93 
     94     public Kernel getKernel()
     95     {
     96         return kernel;
     97     }
     98 
     99     public void initialize()
    100     {
    101         kernel.initialize();
    102     }
    103 
    104     public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable,
    105                            boolean copy )
    106     {
    107         kernel.broadcast( filter, data, reliable, copy );
    108     }
    109 
    110     public void close() throws InterruptedException
    111     {
    112         go.set(false);
    113 
    114         // Kill the kernel
    115         kernel.terminate();
    116     }
    117 
    118     protected void reportError( Endpoint p, Object context, Exception e )
    119     {
    120         // Should really be queued up so the outer thread can
    121         // retrieve them.  For now we'll just log it.  FIXME
    122         log.log( Level.SEVERE, "Unhandled error, endpoint:" + p + ", context:" + context, e );
    123 
    124         // In lieu of other options, at least close the endpoint
    125         p.close();
    126     }
    127 
    128     protected HostedConnection getConnection( Endpoint p )
    129     {
    130         return server.getConnection(p);
    131     }
    132 
    133     protected void connectionClosed( Endpoint p )
    134     {
    135         // Remove any message buffer we've been accumulating
    136         // on behalf of this endpoing
    137         messageBuffers.remove(p);
    138 
    139         log.log( Level.FINE, "Buffers size:{0}", messageBuffers.size() );
    140 
    141         server.connectionClosed(p);
    142     }
    143 
    144     /**
    145      *  Note on threading for those writing their own server
    146      *  or adapter implementations.  The rule that a single connection be
    147      *  processed by only one thread at a time is more about ensuring that
    148      *  the messages are delivered in the order that they are received
    149      *  than for any user-code safety.  99% of the time the user code should
    150      *  be writing for multithreaded access anyway.
    151      *
    152      *  <p>The issue with the messages is that if a an implementation is
    153      *  using a general thread pool then it would be possible for a
    154      *  naive implementation to have one thread grab an Envelope from
    155      *  connection 1's and another grab the next Envelope.  Since an Envelope
    156      *  may contain several messages, delivering the second thread's messages
    157      *  before or during the first's would be really confusing and hard
    158      *  to code for in user code.</p>
    159      *
    160      *  <p>And that's why this note is here.  DefaultServer does a rudimentary
    161      *  per-connection locking but it couldn't possibly guard against
    162      *  out of order Envelope processing.</p>
    163      */
    164     protected void dispatch( Endpoint p, Message m )
    165     {
    166         // Because this class is the only one with the information
    167         // to do it... we need to pull of the registration message
    168         // here.
    169         if( m instanceof ClientRegistrationMessage ) {
    170             server.registerClient( this, p, (ClientRegistrationMessage)m );
    171             return;
    172         }
    173 
    174         try {
    175             HostedConnection source = getConnection(p);
    176             if( source == null ) {
    177                 if( reliable ) {
    178                     // If it's a reliable connection then it's slightly more
    179                     // concerning but this can happen all the time for a UDP endpoint.
    180                     log.log( Level.WARNING, "Recieved message from unconnected endpoint:" + p + "  message:" + m );
    181                 }
    182                 return;
    183             }
    184             messageDispatcher.messageReceived( source, m );
    185         } catch( Exception e ) {
    186             reportError(p, m, e);
    187         }
    188     }
    189 
    190     protected MessageProtocol getMessageBuffer( Endpoint p )
    191     {
    192         if( !reliable ) {
    193             // Since UDP comes in packets and they aren't split
    194             // up, there is no reason to buffer.  In fact, there would
    195             // be a down side because there is no way for us to reliably
    196             // clean these up later since we'd create another one for
    197             // any random UDP packet that comes to the port.
    198             return new MessageProtocol();
    199         } else {
    200             // See if we already have one
    201             MessageProtocol result = messageBuffers.get(p);
    202             if( result == null ) {
    203                 result = new MessageProtocol();
    204                 messageBuffers.put(p, result);
    205             }
    206             return result;
    207         }
    208     }
    209 
    210     protected void createAndDispatch( Envelope env )
    211     {
    212         MessageProtocol protocol = getMessageBuffer(env.getSource());
    213 
    214         byte[] data = env.getData();
    215         ByteBuffer buffer = ByteBuffer.wrap(data);
    216 
    217         int count = protocol.addBuffer( buffer );
    218         if( count == 0 ) {
    219             // This can happen if there was only a partial message
    220             // received.  However, this should never happen for unreliable
    221             // connections.
    222             if( !reliable ) {
    223                 // Log some additional information about the packet.
    224                 int len = Math.min( 10, data.length );
    225                 StringBuilder sb = new StringBuilder();
    226                 for( int i = 0; i < len; i++ ) {
    227                     sb.append( "[" + Integer.toHexString(data[i]) + "]" );
    228                 }
    229                 log.log( Level.INFO, "First 10 bytes of incomplete nessage:" + sb );
    230                 throw new RuntimeException( "Envelope contained incomplete data:" + env );
    231             }
    232         }
    233 
    234         // Should be complete... and maybe we should check but we don't
    235         Message m = null;
    236         while( (m = protocol.getMessage()) != null ) {
    237             m.setReliable(reliable);
    238             dispatch( env.getSource(), m );
    239         }
    240     }
    241 
    242     protected void createAndDispatch( EndpointEvent event )
    243     {
    244         // Only need to tell the server about disconnects
    245         if( event.getType() == EndpointEvent.Type.REMOVE ) {
    246             connectionClosed( event.getEndpoint() );
    247         }
    248     }
    249 
    250     protected void flushEvents()
    251     {
    252         EndpointEvent event;
    253         while( (event = kernel.nextEvent()) != null ) {
    254             try {
    255                 createAndDispatch( event );
    256             } catch( Exception e ) {
    257                 reportError(event.getEndpoint(), event, e);
    258             }
    259         }
    260     }
    261 
    262     public void run()
    263     {
    264         while( go.get() ) {
    265 
    266             try {
    267                 // Check for pending events
    268                 flushEvents();
    269 
    270                 // Grab the next envelope
    271                 Envelope e = kernel.read();
    272                 if( e == Kernel.EVENTS_PENDING )
    273                     continue; // We'll catch it up above
    274 
    275                 // Check for pending events that might have
    276                 // come in while we were blocking.  This is usually
    277                 // when the connection add events come through
    278                 flushEvents();
    279 
    280                 try {
    281                     createAndDispatch( e );
    282                 } catch( Exception ex ) {
    283                     reportError(e.getSource(), e, ex);
    284                 }
    285 
    286             } catch( InterruptedException ex ) {
    287                 if( !go.get() )
    288                     return;
    289                 throw new RuntimeException( "Unexpected interruption", ex );
    290             }
    291         }
    292     }
    293 
    294 }
    295 
    296 
    297