1 /* 2 * Copyright (c) 2011 jMonkeyEngine 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are 7 * met: 8 * 9 * * Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 12 * * Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * * Neither the name of 'jMonkeyEngine' nor the names of its contributors 17 * may be used to endorse or promote products derived from this software 18 * without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 22 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR 24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 27 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 28 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 29 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 30 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 31 */ 32 33 package com.jme3.network.base; 34 35 import com.jme3.network.*; 36 import com.jme3.network.kernel.Endpoint; 37 import com.jme3.network.kernel.Kernel; 38 import com.jme3.network.message.ChannelInfoMessage; 39 import com.jme3.network.message.ClientRegistrationMessage; 40 import com.jme3.network.message.DisconnectMessage; 41 import java.io.IOException; 42 import java.nio.ByteBuffer; 43 import java.util.*; 44 import java.util.concurrent.ConcurrentHashMap; 45 import java.util.concurrent.CopyOnWriteArrayList; 46 import java.util.concurrent.atomic.AtomicInteger; 47 import java.util.logging.Level; 48 import java.util.logging.Logger; 49 50 /** 51 * A default implementation of the Server interface that delegates 52 * its network connectivity to kernel.Kernel. 53 * 54 * @version $Revision: 9114 $ 55 * @author Paul Speed 56 */ 57 public class DefaultServer implements Server 58 { 59 static Logger log = Logger.getLogger(DefaultServer.class.getName()); 60 61 // First two channels are reserved for reliable and 62 // unreliable 63 private static final int CH_RELIABLE = 0; 64 private static final int CH_UNRELIABLE = 1; 65 private static final int CH_FIRST = 2; 66 67 private boolean isRunning = false; 68 private AtomicInteger nextId = new AtomicInteger(0); 69 private String gameName; 70 private int version; 71 private KernelFactory kernelFactory = KernelFactory.DEFAULT; 72 private KernelAdapter reliableAdapter; 73 private KernelAdapter fastAdapter; 74 private List<KernelAdapter> channels = new ArrayList<KernelAdapter>(); 75 private List<Integer> alternatePorts = new ArrayList<Integer>(); 76 private Redispatch dispatcher = new Redispatch(); 77 private Map<Integer,HostedConnection> connections = new ConcurrentHashMap<Integer,HostedConnection>(); 78 private Map<Endpoint,HostedConnection> endpointConnections 79 = new ConcurrentHashMap<Endpoint,HostedConnection>(); 80 81 // Keeps track of clients for whom we've only received the UDP 82 // registration message 83 private Map<Long,Connection> connecting = new ConcurrentHashMap<Long,Connection>(); 84 85 private MessageListenerRegistry<HostedConnection> messageListeners 86 = new MessageListenerRegistry<HostedConnection>(); 87 private List<ConnectionListener> connectionListeners = new CopyOnWriteArrayList<ConnectionListener>(); 88 89 public DefaultServer( String gameName, int version, Kernel reliable, Kernel fast ) 90 { 91 if( reliable == null ) 92 throw new IllegalArgumentException( "Default server reqiures a reliable kernel instance." ); 93 94 this.gameName = gameName; 95 this.version = version; 96 97 reliableAdapter = new KernelAdapter( this, reliable, dispatcher, true ); 98 channels.add( reliableAdapter ); 99 if( fast != null ) { 100 fastAdapter = new KernelAdapter( this, fast, dispatcher, false ); 101 channels.add( fastAdapter ); 102 } 103 } 104 105 public String getGameName() 106 { 107 return gameName; 108 } 109 110 public int getVersion() 111 { 112 return version; 113 } 114 115 public int addChannel( int port ) 116 { 117 if( isRunning ) 118 throw new IllegalStateException( "Channels cannot be added once server is started." ); 119 120 // Note: it does bug me that channels aren't 100% universal and 121 // setup externally but it requires a more invasive set of changes 122 // for "connection types" and some kind of registry of kernel and 123 // connector factories. This really would be the best approach and 124 // would allow all kinds of channel customization maybe... but for 125 // now, we hard-code the standard connections and treat the +2 extras 126 // differently. 127 128 // Check for consistency with the channels list 129 if( channels.size() - CH_FIRST != alternatePorts.size() ) 130 throw new IllegalStateException( "Channel and port lists do not match." ); 131 132 try { 133 int result = alternatePorts.size(); 134 alternatePorts.add(port); 135 136 Kernel kernel = kernelFactory.createKernel(result, port); 137 channels.add( new KernelAdapter(this, kernel, dispatcher, true) ); 138 139 return result; 140 } catch( IOException e ) { 141 throw new RuntimeException( "Error adding channel for port:" + port, e ); 142 } 143 } 144 145 protected void checkChannel( int channel ) 146 { 147 if( channel < 0 || channel >= alternatePorts.size() ) 148 throw new IllegalArgumentException( "Channel is undefined:" + channel ); 149 } 150 151 public void start() 152 { 153 if( isRunning ) 154 throw new IllegalStateException( "Server is already started." ); 155 156 // Initialize the kernels 157 for( KernelAdapter ka : channels ) { 158 ka.initialize(); 159 } 160 161 // Start em up 162 for( KernelAdapter ka : channels ) { 163 ka.start(); 164 } 165 166 isRunning = true; 167 } 168 169 public boolean isRunning() 170 { 171 return isRunning; 172 } 173 174 public void close() 175 { 176 if( !isRunning ) 177 throw new IllegalStateException( "Server is not started." ); 178 179 try { 180 // Kill the adpaters, they will kill the kernels 181 for( KernelAdapter ka : channels ) { 182 ka.close(); 183 } 184 185 isRunning = false; 186 } catch( InterruptedException e ) { 187 throw new RuntimeException( "Interrupted while closing", e ); 188 } 189 } 190 191 public void broadcast( Message message ) 192 { 193 broadcast( null, message ); 194 } 195 196 public void broadcast( Filter<? super HostedConnection> filter, Message message ) 197 { 198 if( connections.isEmpty() ) 199 return; 200 201 ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null); 202 203 FilterAdapter adapter = filter == null ? null : new FilterAdapter(filter); 204 205 if( message.isReliable() || fastAdapter == null ) { 206 // Don't need to copy the data because message protocol is already 207 // giving us a fresh buffer 208 reliableAdapter.broadcast( adapter, buffer, true, false ); 209 } else { 210 fastAdapter.broadcast( adapter, buffer, false, false ); 211 } 212 } 213 214 public void broadcast( int channel, Filter<? super HostedConnection> filter, Message message ) 215 { 216 if( connections.isEmpty() ) 217 return; 218 219 checkChannel(channel); 220 221 ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null); 222 223 FilterAdapter adapter = filter == null ? null : new FilterAdapter(filter); 224 225 channels.get(channel+CH_FIRST).broadcast( adapter, buffer, true, false ); 226 } 227 228 public HostedConnection getConnection( int id ) 229 { 230 return connections.get(id); 231 } 232 233 public boolean hasConnections() 234 { 235 return !connections.isEmpty(); 236 } 237 238 public Collection<HostedConnection> getConnections() 239 { 240 return Collections.unmodifiableCollection((Collection<HostedConnection>)connections.values()); 241 } 242 243 public void addConnectionListener( ConnectionListener listener ) 244 { 245 connectionListeners.add(listener); 246 } 247 248 public void removeConnectionListener( ConnectionListener listener ) 249 { 250 connectionListeners.remove(listener); 251 } 252 253 public void addMessageListener( MessageListener<? super HostedConnection> listener ) 254 { 255 messageListeners.addMessageListener( listener ); 256 } 257 258 public void addMessageListener( MessageListener<? super HostedConnection> listener, Class... classes ) 259 { 260 messageListeners.addMessageListener( listener, classes ); 261 } 262 263 public void removeMessageListener( MessageListener<? super HostedConnection> listener ) 264 { 265 messageListeners.removeMessageListener( listener ); 266 } 267 268 public void removeMessageListener( MessageListener<? super HostedConnection> listener, Class... classes ) 269 { 270 messageListeners.removeMessageListener( listener, classes ); 271 } 272 273 protected void dispatch( HostedConnection source, Message m ) 274 { 275 if( source == null ) { 276 messageListeners.messageReceived( source, m ); 277 } else { 278 279 // A semi-heavy handed way to make sure the listener 280 // doesn't get called at the same time from two different 281 // threads for the same hosted connection. 282 synchronized( source ) { 283 messageListeners.messageReceived( source, m ); 284 } 285 } 286 } 287 288 protected void fireConnectionAdded( HostedConnection conn ) 289 { 290 for( ConnectionListener l : connectionListeners ) { 291 l.connectionAdded( this, conn ); 292 } 293 } 294 295 protected void fireConnectionRemoved( HostedConnection conn ) 296 { 297 for( ConnectionListener l : connectionListeners ) { 298 l.connectionRemoved( this, conn ); 299 } 300 } 301 302 protected int getChannel( KernelAdapter ka ) 303 { 304 return channels.indexOf(ka); 305 } 306 307 protected void registerClient( KernelAdapter ka, Endpoint p, ClientRegistrationMessage m ) 308 { 309 Connection addedConnection = null; 310 311 // generally this will only be called by one thread but it's 312 // important enough I won't take chances 313 synchronized( this ) { 314 // Grab the random ID that the client created when creating 315 // its two registration messages 316 long tempId = m.getId(); 317 318 // See if we already have one 319 Connection c = connecting.remove(tempId); 320 if( c == null ) { 321 c = new Connection(channels.size()); 322 log.log( Level.FINE, "Registering client for endpoint, pass 1:{0}.", p ); 323 } else { 324 log.log( Level.FINE, "Refining client registration for endpoint:{0}.", p ); 325 } 326 327 // Fill in what we now know 328 int channel = getChannel(ka); 329 c.setChannel(channel, p); 330 log.log( Level.FINE, "Setting up channel:{0}", channel ); 331 332 // If it's channel 0 then this is the initial connection 333 // and we will send the connection information 334 if( channel == CH_RELIABLE ) { 335 // Validate the name and version which is only sent 336 // over the reliable connection at this point. 337 if( !getGameName().equals(m.getGameName()) 338 || getVersion() != m.getVersion() ) { 339 340 log.log( Level.INFO, "Kicking client due to name/version mismatch:{0}.", c ); 341 342 // Need to kick them off... I may regret doing this from within 343 // the sync block but the alternative is more code 344 c.close( "Server client mismatch, server:" + getGameName() + " v" + getVersion() 345 + " client:" + m.getGameName() + " v" + m.getVersion() ); 346 return; 347 } 348 349 // Else send the extra channel information to the client 350 if( !alternatePorts.isEmpty() ) { 351 ChannelInfoMessage cim = new ChannelInfoMessage( m.getId(), alternatePorts ); 352 c.send(cim); 353 } 354 } 355 356 if( c.isComplete() ) { 357 // Then we are fully connected 358 if( connections.put( c.getId(), c ) == null ) { 359 360 for( Endpoint cp : c.channels ) { 361 if( cp == null ) 362 continue; 363 endpointConnections.put( cp, c ); 364 } 365 366 addedConnection = c; 367 } 368 } else { 369 // Need to keep getting channels so we'll keep it in 370 // the map 371 connecting.put(tempId, c); 372 } 373 } 374 375 // Best to do this outside of the synch block to avoid 376 // over synchronizing which is the path to deadlocks 377 if( addedConnection != null ) { 378 log.log( Level.INFO, "Client registered:{0}.", addedConnection ); 379 380 // Send the ID back to the client letting it know it's 381 // fully connected. 382 m = new ClientRegistrationMessage(); 383 m.setId( addedConnection.getId() ); 384 m.setReliable(true); 385 addedConnection.send(m); 386 387 // Now we can notify the listeners about the 388 // new connection. 389 fireConnectionAdded( addedConnection ); 390 } 391 } 392 393 protected HostedConnection getConnection( Endpoint endpoint ) 394 { 395 return endpointConnections.get(endpoint); 396 } 397 398 protected void connectionClosed( Endpoint p ) 399 { 400 if( p.isConnected() ) { 401 log.log( Level.INFO, "Connection closed:{0}.", p ); 402 } else { 403 log.log( Level.FINE, "Connection closed:{0}.", p ); 404 } 405 406 // Try to find the endpoint in all ways that it might 407 // exist. Note: by this point the raw network channel is 408 // closed already. 409 410 // Also note: this method will be called multiple times per 411 // HostedConnection if it has multiple endpoints. 412 413 Connection removed = null; 414 synchronized( this ) { 415 // Just in case the endpoint was still connecting 416 connecting.values().remove(p); 417 418 // And the regular management 419 removed = (Connection)endpointConnections.remove(p); 420 if( removed != null ) { 421 connections.remove( removed.getId() ); 422 } 423 424 log.log( Level.FINE, "Connections size:{0}", connections.size() ); 425 log.log( Level.FINE, "Endpoint mappings size:{0}", endpointConnections.size() ); 426 } 427 428 // Better not to fire events while we hold a lock 429 // so always do this outside the synch block. 430 // Note: checking removed.closed just to avoid spurious log messages 431 // since in general we are called back for every endpoint closing. 432 if( removed != null && !removed.closed ) { 433 434 log.log( Level.INFO, "Client closed:{0}.", removed ); 435 436 removed.closeConnection(); 437 } 438 } 439 440 protected class Connection implements HostedConnection 441 { 442 private int id; 443 private boolean closed; 444 private Endpoint[] channels; 445 private int setChannelCount = 0; 446 447 private Map<String,Object> sessionData = new ConcurrentHashMap<String,Object>(); 448 449 public Connection( int channelCount ) 450 { 451 id = nextId.getAndIncrement(); 452 channels = new Endpoint[channelCount]; 453 } 454 455 void setChannel( int channel, Endpoint p ) 456 { 457 if( channels[channel] != null && channels[channel] != p ) { 458 throw new RuntimeException( "Channel has already been set:" + channel 459 + " = " + channels[channel] + ", cannot be set to:" + p ); 460 } 461 channels[channel] = p; 462 if( p != null ) 463 setChannelCount++; 464 } 465 466 boolean isComplete() 467 { 468 return setChannelCount == channels.length; 469 } 470 471 public Server getServer() 472 { 473 return DefaultServer.this; 474 } 475 476 public int getId() 477 { 478 return id; 479 } 480 481 public String getAddress() 482 { 483 return channels[CH_RELIABLE] == null ? null : channels[CH_RELIABLE].getAddress(); 484 } 485 486 public void send( Message message ) 487 { 488 ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null); 489 if( message.isReliable() || channels[CH_UNRELIABLE] == null ) { 490 channels[CH_RELIABLE].send( buffer ); 491 } else { 492 channels[CH_UNRELIABLE].send( buffer ); 493 } 494 } 495 496 public void send( int channel, Message message ) 497 { 498 checkChannel(channel); 499 ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null); 500 channels[channel+CH_FIRST].send(buffer); 501 } 502 503 protected void closeConnection() 504 { 505 if( closed ) 506 return; 507 closed = true; 508 509 // Make sure all endpoints are closed. Note: reliable 510 // should always already be closed through all paths that I 511 // can conceive... but it doesn't hurt to be sure. 512 for( Endpoint p : channels ) { 513 if( p == null ) 514 continue; 515 p.close(); 516 } 517 518 fireConnectionRemoved( this ); 519 } 520 521 public void close( String reason ) 522 { 523 // Send a reason 524 DisconnectMessage m = new DisconnectMessage(); 525 m.setType( DisconnectMessage.KICK ); 526 m.setReason( reason ); 527 m.setReliable( true ); 528 send( m ); 529 530 // Just close the reliable endpoint 531 // fast will be cleaned up as a side-effect 532 // when closeConnection() is called by the 533 // connectionClosed() endpoint callback. 534 if( channels[CH_RELIABLE] != null ) { 535 // Close with flush so we make sure our 536 // message gets out 537 channels[CH_RELIABLE].close(true); 538 } 539 } 540 541 public Object setAttribute( String name, Object value ) 542 { 543 if( value == null ) 544 return sessionData.remove(name); 545 return sessionData.put(name, value); 546 } 547 548 @SuppressWarnings("unchecked") 549 public <T> T getAttribute( String name ) 550 { 551 return (T)sessionData.get(name); 552 } 553 554 public Set<String> attributeNames() 555 { 556 return Collections.unmodifiableSet(sessionData.keySet()); 557 } 558 559 public String toString() 560 { 561 return "Connection[ id=" + id + ", reliable=" + channels[CH_RELIABLE] 562 + ", fast=" + channels[CH_UNRELIABLE] + " ]"; 563 } 564 } 565 566 protected class Redispatch implements MessageListener<HostedConnection> 567 { 568 public void messageReceived( HostedConnection source, Message m ) 569 { 570 dispatch( source, m ); 571 } 572 } 573 574 protected class FilterAdapter implements Filter<Endpoint> 575 { 576 private Filter<? super HostedConnection> delegate; 577 578 public FilterAdapter( Filter<? super HostedConnection> delegate ) 579 { 580 this.delegate = delegate; 581 } 582 583 public boolean apply( Endpoint input ) 584 { 585 HostedConnection conn = getConnection( input ); 586 if( conn == null ) 587 return false; 588 return delegate.apply(conn); 589 } 590 } 591 } 592