Home | History | Annotate | Download | only in tcp
      1 /*
      2  * Copyright (c) 2011 jMonkeyEngine
      3  * All rights reserved.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are
      7  * met:
      8  *
      9  * * Redistributions of source code must retain the above copyright
     10  *   notice, this list of conditions and the following disclaimer.
     11  *
     12  * * Redistributions in binary form must reproduce the above copyright
     13  *   notice, this list of conditions and the following disclaimer in the
     14  *   documentation and/or other materials provided with the distribution.
     15  *
     16  * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
     17  *   may be used to endorse or promote products derived from this software
     18  *   without specific prior written permission.
     19  *
     20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
     22  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
     23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
     24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
     25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
     27  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
     28  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
     29  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
     30  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     31  */
     32 
     33 package com.jme3.network.kernel.tcp;
     34 
     35 import com.jme3.network.kernel.Endpoint;
     36 import com.jme3.network.kernel.Kernel;
     37 import com.jme3.network.kernel.KernelException;
     38 import java.io.IOException;
     39 import java.nio.ByteBuffer;
     40 import java.nio.channels.SocketChannel;
     41 import java.util.concurrent.ConcurrentLinkedQueue;
     42 
     43 
     44 /**
     45  *  Endpoint implementation that encapsulates the
     46  *  channel IO based connection information and keeps
     47  *  track of the outbound data queue for the channel.
     48  *
     49  *  @version   $Revision: 8944 $
     50  *  @author    Paul Speed
     51  */
     52 public class NioEndpoint implements Endpoint
     53 {
     54     protected static final ByteBuffer CLOSE_MARKER = ByteBuffer.allocate(0);
     55 
     56     private long id;
     57     private SocketChannel socket;
     58     private SelectorKernel kernel;
     59     private ConcurrentLinkedQueue<ByteBuffer> outbound = new ConcurrentLinkedQueue<ByteBuffer>();
     60     private boolean closing = false;
     61 
     62     public NioEndpoint( SelectorKernel kernel, long id, SocketChannel socket )
     63     {
     64         this.id = id;
     65         this.socket = socket;
     66         this.kernel = kernel;
     67     }
     68 
     69     public Kernel getKernel()
     70     {
     71         return kernel;
     72     }
     73 
     74     public void close()
     75     {
     76         close(false);
     77     }
     78 
     79     public void close( boolean flushData )
     80     {
     81         if( flushData ) {
     82             closing = true;
     83 
     84             // Enqueue a close marker message to let the server
     85             // know we should close
     86             send( CLOSE_MARKER, false, true );
     87 
     88             return;
     89         }
     90 
     91         try {
     92             // Note: even though we may be disconnected from the socket.isConnected()
     93             // standpoint, it's still safest to tell the kernel so that it can be sure
     94             // to stop managing us gracefully.
     95             kernel.closeEndpoint(this);
     96         } catch( IOException e ) {
     97             throw new KernelException( "Error closing endpoint for socket:" + socket, e );
     98         }
     99     }
    100 
    101     public long getId()
    102     {
    103         return id;
    104     }
    105 
    106     public String getAddress()
    107     {
    108         return String.valueOf(socket.socket().getRemoteSocketAddress());
    109     }
    110 
    111     public boolean isConnected()
    112     {
    113         return socket.isConnected();
    114     }
    115 
    116     /**
    117      *  The wakeup option is used internally when the kernel is
    118      *  broadcasting out to a bunch of endpoints and doesn't want to
    119      *  necessarily wakeup right away.
    120      */
    121     protected void send( ByteBuffer data, boolean copy, boolean wakeup )
    122     {
    123         // We create a ByteBuffer per endpoint since we
    124         // use it to track the data sent to each endpoint
    125         // separately.
    126         ByteBuffer buffer;
    127         if( !copy ) {
    128             buffer = data;
    129         } else {
    130             // Copy the buffer
    131             buffer = ByteBuffer.allocate(data.remaining());
    132             buffer.put(data);
    133             buffer.flip();
    134         }
    135 
    136         // Queue it up
    137         outbound.add(buffer);
    138 
    139         if( wakeup )
    140             kernel.wakeupSelector();
    141     }
    142 
    143     /**
    144      *  Called by the SelectorKernel to get the current top
    145      *  buffer for writing.
    146      */
    147     protected ByteBuffer peekPending()
    148     {
    149         return outbound.peek();
    150     }
    151 
    152     /**
    153      *  Called by the SelectorKernel when the top buffer
    154      *  has been exhausted.
    155      */
    156     protected ByteBuffer removePending()
    157     {
    158         return outbound.poll();
    159     }
    160 
    161     protected boolean hasPending()
    162     {
    163         return !outbound.isEmpty();
    164     }
    165 
    166     public void send( ByteBuffer data )
    167     {
    168         if( data == null ) {
    169             throw new IllegalArgumentException( "Data cannot be null." );
    170         }
    171         if( closing ) {
    172             throw new KernelException( "Endpoint has been closed:" + socket );
    173         }
    174         send( data, true, true );
    175     }
    176 
    177     public String toString()
    178     {
    179         return "NioEndpoint[" + id + ", " + socket + "]";
    180     }
    181 }
    182