Home | History | Annotate | Download | only in udp
      1 /*
      2  * Copyright (c) 2011 jMonkeyEngine
      3  * All rights reserved.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are
      7  * met:
      8  *
      9  * * Redistributions of source code must retain the above copyright
     10  *   notice, this list of conditions and the following disclaimer.
     11  *
     12  * * Redistributions in binary form must reproduce the above copyright
     13  *   notice, this list of conditions and the following disclaimer in the
     14  *   documentation and/or other materials provided with the distribution.
     15  *
     16  * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
     17  *   may be used to endorse or promote products derived from this software
     18  *   without specific prior written permission.
     19  *
     20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
     22  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
     23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
     24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
     25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
     27  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
     28  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
     29  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
     30  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     31  */
     32 
     33 package com.jme3.network.kernel.udp;
     34 
     35 import com.jme3.network.Filter;
     36 import com.jme3.network.kernel.*;
     37 import java.io.IOException;
     38 import java.net.*;
     39 import java.nio.ByteBuffer;
     40 import java.util.Map;
     41 import java.util.concurrent.ConcurrentHashMap;
     42 import java.util.concurrent.ExecutorService;
     43 import java.util.concurrent.Executors;
     44 import java.util.concurrent.atomic.AtomicBoolean;
     45 import java.util.logging.Level;
     46 import java.util.logging.Logger;
     47 
     48 /**
     49  *  A Kernel implementation using UDP packets.
     50  *
     51  *  @version   $Revision: 8944 $
     52  *  @author    Paul Speed
     53  */
     54 public class UdpKernel extends AbstractKernel
     55 {
     56     static Logger log = Logger.getLogger(UdpKernel.class.getName());
     57 
     58     private InetSocketAddress address;
     59     private HostThread thread;
     60 
     61     private ExecutorService writer;
     62 
     63     // The nature of UDP means that even through a firewall,
     64     // a user would have to have a unique address+port since UDP
     65     // can't really be NAT'ed.
     66     private Map<SocketAddress,UdpEndpoint> socketEndpoints = new ConcurrentHashMap<SocketAddress,UdpEndpoint>();
     67 
     68     public UdpKernel( InetAddress host, int port )
     69     {
     70         this( new InetSocketAddress(host, port) );
     71     }
     72 
     73     public UdpKernel( int port ) throws IOException
     74     {
     75         this( new InetSocketAddress(port) );
     76     }
     77 
     78     public UdpKernel( InetSocketAddress address )
     79     {
     80         this.address = address;
     81     }
     82 
     83     protected HostThread createHostThread()
     84     {
     85         return new HostThread();
     86     }
     87 
     88     public void initialize()
     89     {
     90         if( thread != null )
     91             throw new IllegalStateException( "Kernel already initialized." );
     92 
     93         writer = Executors.newFixedThreadPool(2, new NamedThreadFactory(toString() + "-writer"));
     94 
     95         thread = createHostThread();
     96 
     97         try {
     98             thread.connect();
     99             thread.start();
    100         } catch( IOException e ) {
    101             throw new KernelException( "Error hosting:" + address, e );
    102         }
    103     }
    104 
    105     public void terminate() throws InterruptedException
    106     {
    107         if( thread == null )
    108             throw new IllegalStateException( "Kernel not initialized." );
    109 
    110         try {
    111             thread.close();
    112             writer.shutdown();
    113             thread = null;
    114         } catch( IOException e ) {
    115             throw new KernelException( "Error closing host connection:" + address, e );
    116         }
    117     }
    118 
    119     /**
    120      *  Dispatches the data to all endpoints managed by the
    121      *  kernel.  'routing' is currently ignored.
    122      */
    123     public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable,
    124                            boolean copy )
    125     {
    126         if( reliable )
    127             throw new UnsupportedOperationException( "Reliable send not supported by this kernel." );
    128 
    129         if( copy ) {
    130             // Copy the data just once
    131             byte[] temp = new byte[data.remaining()];
    132             System.arraycopy(data.array(), data.position(), temp, 0, data.remaining());
    133             data = ByteBuffer.wrap(temp);
    134         }
    135 
    136         // Hand it to all of the endpoints that match our routing
    137         for( UdpEndpoint p : socketEndpoints.values() ) {
    138             // Does it match the filter?
    139             if( filter != null && !filter.apply(p) )
    140                 continue;
    141 
    142             // Send the data
    143             p.send( data );
    144         }
    145     }
    146 
    147     protected Endpoint getEndpoint( SocketAddress address, boolean create )
    148     {
    149         UdpEndpoint p = socketEndpoints.get(address);
    150         if( p == null && create ) {
    151             p = new UdpEndpoint( this, nextEndpointId(), address, thread.getSocket() );
    152             socketEndpoints.put( address, p );
    153 
    154             // Add an event for it.
    155             addEvent( EndpointEvent.createAdd( this, p ) );
    156         }
    157         return p;
    158     }
    159 
    160     /**
    161      *  Called by the endpoints when they need to be closed.
    162      */
    163     protected void closeEndpoint( UdpEndpoint p ) throws IOException
    164     {
    165         // Just book-keeping to do here.
    166         if( socketEndpoints.remove( p.getRemoteAddress() ) == null )
    167             return;
    168 
    169         log.log( Level.INFO, "Closing endpoint:{0}.", p );
    170         log.log( Level.FINE, "Socket endpoints size:{0}", socketEndpoints.size() );
    171 
    172         addEvent( EndpointEvent.createRemove( this, p ) );
    173 
    174         // If there are no pending messages then add one so that the
    175         // kernel-user knows to wake up if it is only listening for
    176         // envelopes.
    177         if( !hasEnvelopes() ) {
    178             // Note: this is not really a race condition.  At worst, our
    179             // event has already been handled by now and it does no harm
    180             // to check again.
    181             addEnvelope( EVENTS_PENDING );
    182         }
    183     }
    184 
    185     protected void newData( DatagramPacket packet )
    186     {
    187         // So the tricky part here is figuring out the endpoint and
    188         // whether it's new or not.  In these UDP schemes, firewalls have
    189         // to be ported back to a specific machine so we will consider
    190         // the address + port (ie: SocketAddress) the defacto unique
    191         // ID.
    192         Endpoint p = getEndpoint( packet.getSocketAddress(), true );
    193 
    194         // We'll copy the data to trim it.
    195         byte[] data = new byte[packet.getLength()];
    196         System.arraycopy(packet.getData(), 0, data, 0, data.length);
    197 
    198         Envelope env = new Envelope( p, data, false );
    199         addEnvelope( env );
    200     }
    201 
    202     protected void enqueueWrite( Endpoint endpoint, DatagramPacket packet )
    203     {
    204         writer.execute( new MessageWriter(endpoint, packet) );
    205     }
    206 
    207     protected class MessageWriter implements Runnable
    208     {
    209         private Endpoint endpoint;
    210         private DatagramPacket packet;
    211 
    212         public MessageWriter( Endpoint endpoint, DatagramPacket packet )
    213         {
    214             this.endpoint = endpoint;
    215             this.packet = packet;
    216         }
    217 
    218         public void run()
    219         {
    220             // Not guaranteed to always work but an extra datagram
    221             // to a dead connection isn't so big of a deal.
    222             if( !endpoint.isConnected() ) {
    223                 return;
    224             }
    225 
    226             try {
    227                 thread.getSocket().send(packet);
    228             } catch( Exception e ) {
    229                 KernelException exc = new KernelException( "Error sending datagram to:" + address, e );
    230                 exc.fillInStackTrace();
    231                 reportError(exc);
    232             }
    233         }
    234     }
    235 
    236     protected class HostThread extends Thread
    237     {
    238         private DatagramSocket socket;
    239         private AtomicBoolean go = new AtomicBoolean(true);
    240 
    241         private byte[] buffer = new byte[65535]; // slightly bigger than needed.
    242 
    243         public HostThread()
    244         {
    245             setName( "UDP Host@" + address );
    246             setDaemon(true);
    247         }
    248 
    249         protected DatagramSocket getSocket()
    250         {
    251             return socket;
    252         }
    253 
    254         public void connect() throws IOException
    255         {
    256             socket = new DatagramSocket( address );
    257             log.log( Level.INFO, "Hosting UDP connection:{0}.", address );
    258         }
    259 
    260         public void close() throws IOException, InterruptedException
    261         {
    262             // Set the thread to stop
    263             go.set(false);
    264 
    265             // Make sure the channel is closed
    266             socket.close();
    267 
    268             // And wait for it
    269             join();
    270         }
    271 
    272         public void run()
    273         {
    274             log.log( Level.INFO, "Kernel started for connection:{0}.", address );
    275 
    276             // An atomic is safest and costs almost nothing
    277             while( go.get() ) {
    278                 try {
    279                     // Could reuse the packet but I don't see the
    280                     // point and it may lead to subtle bugs if not properly
    281                     // reset.
    282                     DatagramPacket packet = new DatagramPacket( buffer, buffer.length );
    283                     socket.receive(packet);
    284 
    285                     newData( packet );
    286                 } catch( IOException e ) {
    287                     if( !go.get() )
    288                         return;
    289                     reportError( e );
    290                 }
    291             }
    292         }
    293     }
    294 }
    295