Home | History | Annotate | Download | only in base
      1 /*
      2  * Copyright (c) 2011 jMonkeyEngine
      3  * All rights reserved.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are
      7  * met:
      8  *
      9  * * Redistributions of source code must retain the above copyright
     10  *   notice, this list of conditions and the following disclaimer.
     11  *
     12  * * Redistributions in binary form must reproduce the above copyright
     13  *   notice, this list of conditions and the following disclaimer in the
     14  *   documentation and/or other materials provided with the distribution.
     15  *
     16  * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
     17  *   may be used to endorse or promote products derived from this software
     18  *   without specific prior written permission.
     19  *
     20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
     22  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
     23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
     24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
     25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
     27  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
     28  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
     29  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
     30  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     31  */
     32 
     33 package com.jme3.network.base;
     34 
     35 import com.jme3.network.*;
     36 import com.jme3.network.kernel.Endpoint;
     37 import com.jme3.network.kernel.Kernel;
     38 import com.jme3.network.message.ChannelInfoMessage;
     39 import com.jme3.network.message.ClientRegistrationMessage;
     40 import com.jme3.network.message.DisconnectMessage;
     41 import java.io.IOException;
     42 import java.nio.ByteBuffer;
     43 import java.util.*;
     44 import java.util.concurrent.ConcurrentHashMap;
     45 import java.util.concurrent.CopyOnWriteArrayList;
     46 import java.util.concurrent.atomic.AtomicInteger;
     47 import java.util.logging.Level;
     48 import java.util.logging.Logger;
     49 
     50 /**
     51  *  A default implementation of the Server interface that delegates
     52  *  its network connectivity to kernel.Kernel.
     53  *
     54  *  @version   $Revision: 9114 $
     55  *  @author    Paul Speed
     56  */
     57 public class DefaultServer implements Server
     58 {
     59     static Logger log = Logger.getLogger(DefaultServer.class.getName());
     60 
     61     // First two channels are reserved for reliable and
     62     // unreliable
     63     private static final int CH_RELIABLE = 0;
     64     private static final int CH_UNRELIABLE = 1;
     65     private static final int CH_FIRST = 2;
     66 
     67     private boolean isRunning = false;
     68     private AtomicInteger nextId = new AtomicInteger(0);
     69     private String gameName;
     70     private int version;
     71     private KernelFactory kernelFactory = KernelFactory.DEFAULT;
     72     private KernelAdapter reliableAdapter;
     73     private KernelAdapter fastAdapter;
     74     private List<KernelAdapter> channels = new ArrayList<KernelAdapter>();
     75     private List<Integer> alternatePorts = new ArrayList<Integer>();
     76     private Redispatch dispatcher = new Redispatch();
     77     private Map<Integer,HostedConnection> connections = new ConcurrentHashMap<Integer,HostedConnection>();
     78     private Map<Endpoint,HostedConnection> endpointConnections
     79                             = new ConcurrentHashMap<Endpoint,HostedConnection>();
     80 
     81     // Keeps track of clients for whom we've only received the UDP
     82     // registration message
     83     private Map<Long,Connection> connecting = new ConcurrentHashMap<Long,Connection>();
     84 
     85     private MessageListenerRegistry<HostedConnection> messageListeners
     86                             = new MessageListenerRegistry<HostedConnection>();
     87     private List<ConnectionListener> connectionListeners = new CopyOnWriteArrayList<ConnectionListener>();
     88 
     89     public DefaultServer( String gameName, int version, Kernel reliable, Kernel fast )
     90     {
     91         if( reliable == null )
     92             throw new IllegalArgumentException( "Default server reqiures a reliable kernel instance." );
     93 
     94         this.gameName = gameName;
     95         this.version = version;
     96 
     97         reliableAdapter = new KernelAdapter( this, reliable, dispatcher, true );
     98         channels.add( reliableAdapter );
     99         if( fast != null ) {
    100             fastAdapter = new KernelAdapter( this, fast, dispatcher, false );
    101             channels.add( fastAdapter );
    102         }
    103     }
    104 
    105     public String getGameName()
    106     {
    107         return gameName;
    108     }
    109 
    110     public int getVersion()
    111     {
    112         return version;
    113     }
    114 
    115     public int addChannel( int port )
    116     {
    117         if( isRunning )
    118             throw new IllegalStateException( "Channels cannot be added once server is started." );
    119 
    120         // Note: it does bug me that channels aren't 100% universal and
    121         // setup externally but it requires a more invasive set of changes
    122         // for "connection types" and some kind of registry of kernel and
    123         // connector factories.  This really would be the best approach and
    124         // would allow all kinds of channel customization maybe... but for
    125         // now, we hard-code the standard connections and treat the +2 extras
    126         // differently.
    127 
    128         // Check for consistency with the channels list
    129         if( channels.size() - CH_FIRST != alternatePorts.size() )
    130             throw new IllegalStateException( "Channel and port lists do not match." );
    131 
    132         try {
    133             int result = alternatePorts.size();
    134             alternatePorts.add(port);
    135 
    136             Kernel kernel = kernelFactory.createKernel(result, port);
    137             channels.add( new KernelAdapter(this, kernel, dispatcher, true) );
    138 
    139             return result;
    140         } catch( IOException e ) {
    141             throw new RuntimeException( "Error adding channel for port:" + port, e );
    142         }
    143     }
    144 
    145     protected void checkChannel( int channel )
    146     {
    147         if( channel < 0 || channel >= alternatePorts.size() )
    148             throw new IllegalArgumentException( "Channel is undefined:" + channel );
    149     }
    150 
    151     public void start()
    152     {
    153         if( isRunning )
    154             throw new IllegalStateException( "Server is already started." );
    155 
    156         // Initialize the kernels
    157         for( KernelAdapter ka : channels ) {
    158             ka.initialize();
    159         }
    160 
    161         // Start em up
    162         for( KernelAdapter ka : channels ) {
    163             ka.start();
    164         }
    165 
    166         isRunning = true;
    167     }
    168 
    169     public boolean isRunning()
    170     {
    171         return isRunning;
    172     }
    173 
    174     public void close()
    175     {
    176         if( !isRunning )
    177             throw new IllegalStateException( "Server is not started." );
    178 
    179         try {
    180             // Kill the adpaters, they will kill the kernels
    181             for( KernelAdapter ka : channels ) {
    182                 ka.close();
    183             }
    184 
    185             isRunning = false;
    186         } catch( InterruptedException e ) {
    187             throw new RuntimeException( "Interrupted while closing", e );
    188         }
    189     }
    190 
    191     public void broadcast( Message message )
    192     {
    193         broadcast( null, message );
    194     }
    195 
    196     public void broadcast( Filter<? super HostedConnection> filter, Message message )
    197     {
    198         if( connections.isEmpty() )
    199             return;
    200 
    201         ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
    202 
    203         FilterAdapter adapter = filter == null ? null : new FilterAdapter(filter);
    204 
    205         if( message.isReliable() || fastAdapter == null ) {
    206             // Don't need to copy the data because message protocol is already
    207             // giving us a fresh buffer
    208             reliableAdapter.broadcast( adapter, buffer, true, false );
    209         } else {
    210             fastAdapter.broadcast( adapter, buffer, false, false );
    211         }
    212     }
    213 
    214     public void broadcast( int channel, Filter<? super HostedConnection> filter, Message message )
    215     {
    216         if( connections.isEmpty() )
    217             return;
    218 
    219         checkChannel(channel);
    220 
    221         ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
    222 
    223         FilterAdapter adapter = filter == null ? null : new FilterAdapter(filter);
    224 
    225         channels.get(channel+CH_FIRST).broadcast( adapter, buffer, true, false );
    226     }
    227 
    228     public HostedConnection getConnection( int id )
    229     {
    230         return connections.get(id);
    231     }
    232 
    233     public boolean hasConnections()
    234     {
    235         return !connections.isEmpty();
    236     }
    237 
    238     public Collection<HostedConnection> getConnections()
    239     {
    240         return Collections.unmodifiableCollection((Collection<HostedConnection>)connections.values());
    241     }
    242 
    243     public void addConnectionListener( ConnectionListener listener )
    244     {
    245         connectionListeners.add(listener);
    246     }
    247 
    248     public void removeConnectionListener( ConnectionListener listener )
    249     {
    250         connectionListeners.remove(listener);
    251     }
    252 
    253     public void addMessageListener( MessageListener<? super HostedConnection> listener )
    254     {
    255         messageListeners.addMessageListener( listener );
    256     }
    257 
    258     public void addMessageListener( MessageListener<? super HostedConnection> listener, Class... classes )
    259     {
    260         messageListeners.addMessageListener( listener, classes );
    261     }
    262 
    263     public void removeMessageListener( MessageListener<? super HostedConnection> listener )
    264     {
    265         messageListeners.removeMessageListener( listener );
    266     }
    267 
    268     public void removeMessageListener( MessageListener<? super HostedConnection> listener, Class... classes )
    269     {
    270         messageListeners.removeMessageListener( listener, classes );
    271     }
    272 
    273     protected void dispatch( HostedConnection source, Message m )
    274     {
    275         if( source == null ) {
    276             messageListeners.messageReceived( source, m );
    277         } else {
    278 
    279             // A semi-heavy handed way to make sure the listener
    280             // doesn't get called at the same time from two different
    281             // threads for the same hosted connection.
    282             synchronized( source ) {
    283                 messageListeners.messageReceived( source, m );
    284             }
    285         }
    286     }
    287 
    288     protected void fireConnectionAdded( HostedConnection conn )
    289     {
    290         for( ConnectionListener l : connectionListeners ) {
    291             l.connectionAdded( this, conn );
    292         }
    293     }
    294 
    295     protected void fireConnectionRemoved( HostedConnection conn )
    296     {
    297         for( ConnectionListener l : connectionListeners ) {
    298             l.connectionRemoved( this, conn );
    299         }
    300     }
    301 
    302     protected int getChannel( KernelAdapter ka )
    303     {
    304         return channels.indexOf(ka);
    305     }
    306 
    307     protected void registerClient( KernelAdapter ka, Endpoint p, ClientRegistrationMessage m )
    308     {
    309         Connection addedConnection = null;
    310 
    311         // generally this will only be called by one thread but it's
    312         // important enough I won't take chances
    313         synchronized( this ) {
    314             // Grab the random ID that the client created when creating
    315             // its two registration messages
    316             long tempId = m.getId();
    317 
    318             // See if we already have one
    319             Connection c = connecting.remove(tempId);
    320             if( c == null ) {
    321                 c = new Connection(channels.size());
    322                 log.log( Level.FINE, "Registering client for endpoint, pass 1:{0}.", p );
    323             } else {
    324                 log.log( Level.FINE, "Refining client registration for endpoint:{0}.", p );
    325             }
    326 
    327             // Fill in what we now know
    328             int channel = getChannel(ka);
    329             c.setChannel(channel, p);
    330             log.log( Level.FINE, "Setting up channel:{0}", channel );
    331 
    332             // If it's channel 0 then this is the initial connection
    333             // and we will send the connection information
    334             if( channel == CH_RELIABLE ) {
    335                 // Validate the name and version which is only sent
    336                 // over the reliable connection at this point.
    337                 if( !getGameName().equals(m.getGameName())
    338                     || getVersion() != m.getVersion() ) {
    339 
    340                     log.log( Level.INFO, "Kicking client due to name/version mismatch:{0}.", c );
    341 
    342                     // Need to kick them off... I may regret doing this from within
    343                     // the sync block but the alternative is more code
    344                     c.close( "Server client mismatch, server:" + getGameName() + " v" + getVersion()
    345                              + "  client:" + m.getGameName() + " v" + m.getVersion() );
    346                     return;
    347                 }
    348 
    349                 // Else send the extra channel information to the client
    350                 if( !alternatePorts.isEmpty() ) {
    351                     ChannelInfoMessage cim = new ChannelInfoMessage( m.getId(), alternatePorts );
    352                     c.send(cim);
    353                 }
    354             }
    355 
    356             if( c.isComplete() ) {
    357                 // Then we are fully connected
    358                 if( connections.put( c.getId(), c ) == null ) {
    359 
    360                     for( Endpoint cp : c.channels ) {
    361                         if( cp == null )
    362                             continue;
    363                         endpointConnections.put( cp, c );
    364                     }
    365 
    366                     addedConnection = c;
    367                 }
    368             } else {
    369                 // Need to keep getting channels so we'll keep it in
    370                 // the map
    371                 connecting.put(tempId, c);
    372             }
    373         }
    374 
    375         // Best to do this outside of the synch block to avoid
    376         // over synchronizing which is the path to deadlocks
    377         if( addedConnection != null ) {
    378             log.log( Level.INFO, "Client registered:{0}.", addedConnection );
    379 
    380             // Send the ID back to the client letting it know it's
    381             // fully connected.
    382             m = new ClientRegistrationMessage();
    383             m.setId( addedConnection.getId() );
    384             m.setReliable(true);
    385             addedConnection.send(m);
    386 
    387             // Now we can notify the listeners about the
    388             // new connection.
    389             fireConnectionAdded( addedConnection );
    390         }
    391     }
    392 
    393     protected HostedConnection getConnection( Endpoint endpoint )
    394     {
    395         return endpointConnections.get(endpoint);
    396     }
    397 
    398     protected void connectionClosed( Endpoint p )
    399     {
    400         if( p.isConnected() ) {
    401             log.log( Level.INFO, "Connection closed:{0}.", p );
    402         } else {
    403             log.log( Level.FINE, "Connection closed:{0}.", p );
    404         }
    405 
    406         // Try to find the endpoint in all ways that it might
    407         // exist.  Note: by this point the raw network channel is
    408         // closed already.
    409 
    410         // Also note: this method will be called multiple times per
    411         // HostedConnection if it has multiple endpoints.
    412 
    413         Connection removed = null;
    414         synchronized( this ) {
    415             // Just in case the endpoint was still connecting
    416             connecting.values().remove(p);
    417 
    418             // And the regular management
    419             removed = (Connection)endpointConnections.remove(p);
    420             if( removed != null ) {
    421                 connections.remove( removed.getId() );
    422             }
    423 
    424             log.log( Level.FINE, "Connections size:{0}", connections.size() );
    425             log.log( Level.FINE, "Endpoint mappings size:{0}", endpointConnections.size() );
    426         }
    427 
    428         // Better not to fire events while we hold a lock
    429         // so always do this outside the synch block.
    430         // Note: checking removed.closed just to avoid spurious log messages
    431         //       since in general we are called back for every endpoint closing.
    432         if( removed != null && !removed.closed ) {
    433 
    434             log.log( Level.INFO, "Client closed:{0}.", removed );
    435 
    436             removed.closeConnection();
    437         }
    438     }
    439 
    440     protected class Connection implements HostedConnection
    441     {
    442         private int id;
    443         private boolean closed;
    444         private Endpoint[] channels;
    445         private int setChannelCount = 0;
    446 
    447         private Map<String,Object> sessionData = new ConcurrentHashMap<String,Object>();
    448 
    449         public Connection( int channelCount )
    450         {
    451             id = nextId.getAndIncrement();
    452             channels = new Endpoint[channelCount];
    453         }
    454 
    455         void setChannel( int channel, Endpoint p )
    456         {
    457             if( channels[channel] != null && channels[channel] != p ) {
    458                 throw new RuntimeException( "Channel has already been set:" + channel
    459                                             + " = " + channels[channel] + ", cannot be set to:" + p );
    460             }
    461             channels[channel] = p;
    462             if( p != null )
    463                 setChannelCount++;
    464         }
    465 
    466         boolean isComplete()
    467         {
    468             return setChannelCount == channels.length;
    469         }
    470 
    471         public Server getServer()
    472         {
    473             return DefaultServer.this;
    474         }
    475 
    476         public int getId()
    477         {
    478             return id;
    479         }
    480 
    481         public String getAddress()
    482         {
    483             return channels[CH_RELIABLE] == null ? null : channels[CH_RELIABLE].getAddress();
    484         }
    485 
    486         public void send( Message message )
    487         {
    488             ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
    489             if( message.isReliable() || channels[CH_UNRELIABLE] == null ) {
    490                 channels[CH_RELIABLE].send( buffer );
    491             } else {
    492                 channels[CH_UNRELIABLE].send( buffer );
    493             }
    494         }
    495 
    496         public void send( int channel, Message message )
    497         {
    498             checkChannel(channel);
    499             ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
    500             channels[channel+CH_FIRST].send(buffer);
    501         }
    502 
    503         protected void closeConnection()
    504         {
    505             if( closed )
    506                 return;
    507             closed = true;
    508 
    509             // Make sure all endpoints are closed.  Note: reliable
    510             // should always already be closed through all paths that I
    511             // can conceive... but it doesn't hurt to be sure.
    512             for( Endpoint p : channels ) {
    513                 if( p == null )
    514                     continue;
    515                 p.close();
    516             }
    517 
    518             fireConnectionRemoved( this );
    519         }
    520 
    521         public void close( String reason )
    522         {
    523             // Send a reason
    524             DisconnectMessage m = new DisconnectMessage();
    525             m.setType( DisconnectMessage.KICK );
    526             m.setReason( reason );
    527             m.setReliable( true );
    528             send( m );
    529 
    530             // Just close the reliable endpoint
    531             // fast will be cleaned up as a side-effect
    532             // when closeConnection() is called by the
    533             // connectionClosed() endpoint callback.
    534             if( channels[CH_RELIABLE] != null ) {
    535                 // Close with flush so we make sure our
    536                 // message gets out
    537                 channels[CH_RELIABLE].close(true);
    538             }
    539         }
    540 
    541         public Object setAttribute( String name, Object value )
    542         {
    543             if( value == null )
    544                 return sessionData.remove(name);
    545             return sessionData.put(name, value);
    546         }
    547 
    548         @SuppressWarnings("unchecked")
    549         public <T> T getAttribute( String name )
    550         {
    551             return (T)sessionData.get(name);
    552         }
    553 
    554         public Set<String> attributeNames()
    555         {
    556             return Collections.unmodifiableSet(sessionData.keySet());
    557         }
    558 
    559         public String toString()
    560         {
    561             return "Connection[ id=" + id + ", reliable=" + channels[CH_RELIABLE]
    562                                      + ", fast=" + channels[CH_UNRELIABLE] + " ]";
    563         }
    564     }
    565 
    566     protected class Redispatch implements MessageListener<HostedConnection>
    567     {
    568         public void messageReceived( HostedConnection source, Message m )
    569         {
    570             dispatch( source, m );
    571         }
    572     }
    573 
    574     protected class FilterAdapter implements Filter<Endpoint>
    575     {
    576         private Filter<? super HostedConnection> delegate;
    577 
    578         public FilterAdapter( Filter<? super HostedConnection> delegate )
    579         {
    580             this.delegate = delegate;
    581         }
    582 
    583         public boolean apply( Endpoint input )
    584         {
    585             HostedConnection conn = getConnection( input );
    586             if( conn == null )
    587                 return false;
    588             return delegate.apply(conn);
    589         }
    590     }
    591 }
    592