Home | History | Annotate | Download | only in base
      1 /*
      2  * Copyright (c) 2011 jMonkeyEngine
      3  * All rights reserved.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are
      7  * met:
      8  *
      9  * * Redistributions of source code must retain the above copyright
     10  *   notice, this list of conditions and the following disclaimer.
     11  *
     12  * * Redistributions in binary form must reproduce the above copyright
     13  *   notice, this list of conditions and the following disclaimer in the
     14  *   documentation and/or other materials provided with the distribution.
     15  *
     16  * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
     17  *   may be used to endorse or promote products derived from this software
     18  *   without specific prior written permission.
     19  *
     20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
     22  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
     23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
     24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
     25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
     27  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
     28  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
     29  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
     30  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     31  */
     32 
     33 package com.jme3.network.base;
     34 
     35 import com.jme3.network.ClientStateListener.DisconnectInfo;
     36 import com.jme3.network.*;
     37 import com.jme3.network.kernel.Connector;
     38 import com.jme3.network.message.ChannelInfoMessage;
     39 import com.jme3.network.message.ClientRegistrationMessage;
     40 import com.jme3.network.message.DisconnectMessage;
     41 import java.io.IOException;
     42 import java.nio.ByteBuffer;
     43 import java.util.*;
     44 import java.util.concurrent.CopyOnWriteArrayList;
     45 import java.util.concurrent.CountDownLatch;
     46 import java.util.logging.Level;
     47 import java.util.logging.Logger;
     48 
     49 /**
     50  *  A default implementation of the Client interface that delegates
     51  *  its network connectivity to a kernel.Connector.
     52  *
     53  *  @version   $Revision: 8938 $
     54  *  @author    Paul Speed
     55  */
     56 public class DefaultClient implements Client
     57 {
     58     static Logger log = Logger.getLogger(DefaultClient.class.getName());
     59 
     60     // First two channels are reserved for reliable and
     61     // unreliable.  Note: channels are endpoint specific so these
     62     // constants and the handling need not have anything to do with
     63     // the same constants in DefaultServer... which is why they are
     64     // separate.
     65     private static final int CH_RELIABLE = 0;
     66     private static final int CH_UNRELIABLE = 1;
     67     private static final int CH_FIRST = 2;
     68 
     69     private ThreadLocal<ByteBuffer> dataBuffer = new ThreadLocal<ByteBuffer>();
     70 
     71     private int id = -1;
     72     private boolean isRunning = false;
     73     private CountDownLatch connecting = new CountDownLatch(1);
     74     private String gameName;
     75     private int version;
     76     private MessageListenerRegistry<Client> messageListeners = new MessageListenerRegistry<Client>();
     77     private List<ClientStateListener> stateListeners = new CopyOnWriteArrayList<ClientStateListener>();
     78     private List<ErrorListener<? super Client>> errorListeners = new CopyOnWriteArrayList<ErrorListener<? super Client>>();
     79     private Redispatch dispatcher = new Redispatch();
     80     private List<ConnectorAdapter> channels = new ArrayList<ConnectorAdapter>();
     81 
     82     private ConnectorFactory connectorFactory;
     83 
     84     public DefaultClient( String gameName, int version )
     85     {
     86         this.gameName = gameName;
     87         this.version = version;
     88     }
     89 
     90     public DefaultClient( String gameName, int version, Connector reliable, Connector fast,
     91                           ConnectorFactory connectorFactory )
     92     {
     93         this( gameName, version );
     94         setPrimaryConnectors( reliable, fast, connectorFactory );
     95     }
     96 
     97     protected void setPrimaryConnectors( Connector reliable, Connector fast, ConnectorFactory connectorFactory )
     98     {
     99         if( reliable == null )
    100             throw new IllegalArgumentException( "The reliable connector cannot be null." );
    101         if( isRunning )
    102             throw new IllegalStateException( "Client is already started." );
    103         if( !channels.isEmpty() )
    104             throw new IllegalStateException( "Channels already exist." );
    105 
    106         this.connectorFactory = connectorFactory;
    107         channels.add(new ConnectorAdapter(reliable, dispatcher, dispatcher, true));
    108         if( fast != null ) {
    109             channels.add(new ConnectorAdapter(fast, dispatcher, dispatcher, false));
    110         } else {
    111             // Add the null adapter to keep the indexes right
    112             channels.add(null);
    113         }
    114     }
    115 
    116     protected void checkRunning()
    117     {
    118         if( !isRunning )
    119             throw new IllegalStateException( "Client is not started." );
    120     }
    121 
    122     public void start()
    123     {
    124         if( isRunning )
    125             throw new IllegalStateException( "Client is already started." );
    126 
    127         // Start up the threads and stuff for the
    128         // connectors that we have
    129         for( ConnectorAdapter ca : channels ) {
    130             if( ca == null )
    131                 continue;
    132             ca.start();
    133         }
    134 
    135         // Send our connection message with a generated ID until
    136         // we get one back from the server.  We'll hash time in
    137         // millis and time in nanos.
    138         // This is used to match the TCP and UDP endpoints up on the
    139         // other end since they may take different routes to get there.
    140         // Behind NAT, many game clients may be coming over the same
    141         // IP address from the server's perspective and they may have
    142         // their UDP ports mapped all over the place.
    143         //
    144         // Since currentTimeMillis() is absolute time and nano time
    145         // is roughtly related to system start time, adding these two
    146         // together should be plenty unique for our purposes.  It wouldn't
    147         // hurt to reconcile with IP on the server side, though.
    148         long tempId = System.currentTimeMillis() + System.nanoTime();
    149 
    150         // Set it true here so we can send some messages.
    151         isRunning = true;
    152 
    153         ClientRegistrationMessage reg;
    154         reg = new ClientRegistrationMessage();
    155         reg.setId(tempId);
    156         reg.setGameName(getGameName());
    157         reg.setVersion(getVersion());
    158         reg.setReliable(true);
    159         send(CH_RELIABLE, reg, false);
    160 
    161         // Send registration messages to any other configured
    162         // connectors
    163         reg = new ClientRegistrationMessage();
    164         reg.setId(tempId);
    165         reg.setReliable(false);
    166         for( int ch = CH_UNRELIABLE; ch < channels.size(); ch++ ) {
    167             if( channels.get(ch) == null )
    168                 continue;
    169             send(ch, reg, false);
    170         }
    171     }
    172 
    173     protected void waitForConnected()
    174     {
    175         if( isConnected() )
    176             return;
    177 
    178         try {
    179             connecting.await();
    180         } catch( InterruptedException e ) {
    181             throw new RuntimeException( "Interrupted waiting for connect", e );
    182         }
    183     }
    184 
    185     public boolean isConnected()
    186     {
    187         return id != -1 && isRunning;
    188     }
    189 
    190     public int getId()
    191     {
    192         return id;
    193     }
    194 
    195     public String getGameName()
    196     {
    197         return gameName;
    198     }
    199 
    200     public int getVersion()
    201     {
    202         return version;
    203     }
    204 
    205     public void send( Message message )
    206     {
    207         if( message.isReliable() || channels.get(CH_UNRELIABLE) == null ) {
    208             send(CH_RELIABLE, message, true);
    209         } else {
    210             send(CH_UNRELIABLE, message, true);
    211         }
    212     }
    213 
    214     public void send( int channel, Message message )
    215     {
    216         if( channel < 0 || channel + CH_FIRST >= channels.size() )
    217             throw new IllegalArgumentException( "Channel is undefined:" + channel );
    218         send( channel + CH_FIRST, message, true );
    219     }
    220 
    221     protected void send( int channel, Message message, boolean waitForConnected )
    222     {
    223         checkRunning();
    224 
    225         if( waitForConnected ) {
    226             // Make sure we aren't still connecting
    227             waitForConnected();
    228         }
    229 
    230         ByteBuffer buffer = dataBuffer.get();
    231         if( buffer == null ) {
    232             buffer = ByteBuffer.allocate( 65536 + 2 );
    233             dataBuffer.set(buffer);
    234         }
    235         buffer.clear();
    236 
    237         // Convert the message to bytes
    238         buffer = MessageProtocol.messageToBuffer(message, buffer);
    239 
    240         // Since we share the buffer between invocations, we will need to
    241         // copy this message's part out of it.  This is because we actually
    242         // do the send on a background thread.
    243         byte[] temp = new byte[buffer.remaining()];
    244         System.arraycopy(buffer.array(), buffer.position(), temp, 0, buffer.remaining());
    245         buffer = ByteBuffer.wrap(temp);
    246 
    247         channels.get(channel).write(buffer);
    248     }
    249 
    250     public void close()
    251     {
    252         checkRunning();
    253 
    254         closeConnections( null );
    255     }
    256 
    257     protected void closeConnections( DisconnectInfo info )
    258     {
    259         if( !isRunning )
    260             return;
    261 
    262         // Send a close message
    263 
    264         // Tell the thread it's ok to die
    265         for( ConnectorAdapter ca : channels ) {
    266             if( ca == null )
    267                 continue;
    268             ca.close();
    269         }
    270 
    271         // Wait for the threads?
    272 
    273         // Just in case we never fully connected
    274         connecting.countDown();
    275 
    276         fireDisconnected(info);
    277 
    278         isRunning = false;
    279     }
    280 
    281     public void addClientStateListener( ClientStateListener listener )
    282     {
    283         stateListeners.add( listener );
    284     }
    285 
    286     public void removeClientStateListener( ClientStateListener listener )
    287     {
    288         stateListeners.remove( listener );
    289     }
    290 
    291     public void addMessageListener( MessageListener<? super Client> listener )
    292     {
    293         messageListeners.addMessageListener( listener );
    294     }
    295 
    296     public void addMessageListener( MessageListener<? super Client> listener, Class... classes )
    297     {
    298         messageListeners.addMessageListener( listener, classes );
    299     }
    300 
    301     public void removeMessageListener( MessageListener<? super Client> listener )
    302     {
    303         messageListeners.removeMessageListener( listener );
    304     }
    305 
    306     public void removeMessageListener( MessageListener<? super Client> listener, Class... classes )
    307     {
    308         messageListeners.removeMessageListener( listener, classes );
    309     }
    310 
    311     public void addErrorListener( ErrorListener<? super Client> listener )
    312     {
    313         errorListeners.add( listener );
    314     }
    315 
    316     public void removeErrorListener( ErrorListener<? super Client> listener )
    317     {
    318         errorListeners.remove( listener );
    319     }
    320 
    321     protected void fireConnected()
    322     {
    323         for( ClientStateListener l : stateListeners ) {
    324             l.clientConnected( this );
    325         }
    326     }
    327 
    328     protected void fireDisconnected( DisconnectInfo info )
    329     {
    330         for( ClientStateListener l : stateListeners ) {
    331             l.clientDisconnected( this, info );
    332         }
    333     }
    334 
    335     /**
    336      *  Either calls the ErrorListener or closes the connection
    337      *  if there are no listeners.
    338      */
    339     protected void handleError( Throwable t )
    340     {
    341         // If there are no listeners then close the connection with
    342         // a reason
    343         if( errorListeners.isEmpty() ) {
    344             log.log( Level.SEVERE, "Termining connection due to unhandled error", t );
    345             DisconnectInfo info = new DisconnectInfo();
    346             info.reason = "Connection Error";
    347             info.error = t;
    348             closeConnections(info);
    349             return;
    350         }
    351 
    352         for( ErrorListener l : errorListeners ) {
    353             l.handleError( this, t );
    354         }
    355     }
    356 
    357     protected void configureChannels( long tempId, int[] ports ) {
    358 
    359         try {
    360             for( int i = 0; i < ports.length; i++ ) {
    361                 Connector c = connectorFactory.createConnector( i, ports[i] );
    362                 ConnectorAdapter ca = new ConnectorAdapter(c, dispatcher, dispatcher, true);
    363                 int ch = channels.size();
    364                 channels.add( ca );
    365 
    366                 // Need to send the connection its hook-up registration
    367                 // and start it.
    368                 ca.start();
    369                 ClientRegistrationMessage reg;
    370                 reg = new ClientRegistrationMessage();
    371                 reg.setId(tempId);
    372                 reg.setReliable(true);
    373                 send( ch, reg, false );
    374             }
    375         } catch( IOException e ) {
    376             throw new RuntimeException( "Error configuring channels", e );
    377         }
    378     }
    379 
    380     protected void dispatch( Message m )
    381     {
    382         // Pull off the connection management messages we're
    383         // interested in and then pass on the rest.
    384         if( m instanceof ClientRegistrationMessage ) {
    385             // Then we've gotten our real id
    386             this.id = (int)((ClientRegistrationMessage)m).getId();
    387             log.log( Level.INFO, "Connection established, id:{0}.", this.id );
    388             connecting.countDown();
    389             fireConnected();
    390             return;
    391         } else if( m instanceof ChannelInfoMessage ) {
    392             // This is an interum step in the connection process and
    393             // now we need to add a bunch of connections
    394             configureChannels( ((ChannelInfoMessage)m).getId(), ((ChannelInfoMessage)m).getPorts() );
    395             return;
    396         } else if( m instanceof DisconnectMessage ) {
    397             // Can't do too much else yet
    398             String reason = ((DisconnectMessage)m).getReason();
    399             log.log( Level.SEVERE, "Connection terminated, reason:{0}.", reason );
    400             DisconnectInfo info = new DisconnectInfo();
    401             info.reason = reason;
    402             closeConnections(info);
    403         }
    404 
    405         // Make sure client MessageListeners are called single-threaded
    406         // since it could receive messages from the TCP and UDP
    407         // thread simultaneously.
    408         synchronized( this ) {
    409             messageListeners.messageReceived( this, m );
    410         }
    411     }
    412 
    413     protected class Redispatch implements MessageListener<Object>, ErrorListener<Object>
    414     {
    415         public void messageReceived( Object source, Message m )
    416         {
    417             dispatch( m );
    418         }
    419 
    420         public void handleError( Object source, Throwable t )
    421         {
    422             // Only doing the DefaultClient.this to make the code
    423             // checker happy... it compiles fine without it but I
    424             // don't like red lines in my editor. :P
    425             DefaultClient.this.handleError( t );
    426         }
    427     }
    428 }
    429