1 /* 2 * Copyright (c) 2011 jMonkeyEngine 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are 7 * met: 8 * 9 * * Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 12 * * Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * * Neither the name of 'jMonkeyEngine' nor the names of its contributors 17 * may be used to endorse or promote products derived from this software 18 * without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 22 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR 24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 27 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 28 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 29 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 30 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 31 */ 32 33 package com.jme3.network.kernel.tcp; 34 35 import com.jme3.network.kernel.Endpoint; 36 import com.jme3.network.kernel.Kernel; 37 import com.jme3.network.kernel.KernelException; 38 import java.io.IOException; 39 import java.nio.ByteBuffer; 40 import java.nio.channels.SocketChannel; 41 import java.util.concurrent.ConcurrentLinkedQueue; 42 43 44 /** 45 * Endpoint implementation that encapsulates the 46 * channel IO based connection information and keeps 47 * track of the outbound data queue for the channel. 48 * 49 * @version $Revision: 8944 $ 50 * @author Paul Speed 51 */ 52 public class NioEndpoint implements Endpoint 53 { 54 protected static final ByteBuffer CLOSE_MARKER = ByteBuffer.allocate(0); 55 56 private long id; 57 private SocketChannel socket; 58 private SelectorKernel kernel; 59 private ConcurrentLinkedQueue<ByteBuffer> outbound = new ConcurrentLinkedQueue<ByteBuffer>(); 60 private boolean closing = false; 61 62 public NioEndpoint( SelectorKernel kernel, long id, SocketChannel socket ) 63 { 64 this.id = id; 65 this.socket = socket; 66 this.kernel = kernel; 67 } 68 69 public Kernel getKernel() 70 { 71 return kernel; 72 } 73 74 public void close() 75 { 76 close(false); 77 } 78 79 public void close( boolean flushData ) 80 { 81 if( flushData ) { 82 closing = true; 83 84 // Enqueue a close marker message to let the server 85 // know we should close 86 send( CLOSE_MARKER, false, true ); 87 88 return; 89 } 90 91 try { 92 // Note: even though we may be disconnected from the socket.isConnected() 93 // standpoint, it's still safest to tell the kernel so that it can be sure 94 // to stop managing us gracefully. 95 kernel.closeEndpoint(this); 96 } catch( IOException e ) { 97 throw new KernelException( "Error closing endpoint for socket:" + socket, e ); 98 } 99 } 100 101 public long getId() 102 { 103 return id; 104 } 105 106 public String getAddress() 107 { 108 return String.valueOf(socket.socket().getRemoteSocketAddress()); 109 } 110 111 public boolean isConnected() 112 { 113 return socket.isConnected(); 114 } 115 116 /** 117 * The wakeup option is used internally when the kernel is 118 * broadcasting out to a bunch of endpoints and doesn't want to 119 * necessarily wakeup right away. 120 */ 121 protected void send( ByteBuffer data, boolean copy, boolean wakeup ) 122 { 123 // We create a ByteBuffer per endpoint since we 124 // use it to track the data sent to each endpoint 125 // separately. 126 ByteBuffer buffer; 127 if( !copy ) { 128 buffer = data; 129 } else { 130 // Copy the buffer 131 buffer = ByteBuffer.allocate(data.remaining()); 132 buffer.put(data); 133 buffer.flip(); 134 } 135 136 // Queue it up 137 outbound.add(buffer); 138 139 if( wakeup ) 140 kernel.wakeupSelector(); 141 } 142 143 /** 144 * Called by the SelectorKernel to get the current top 145 * buffer for writing. 146 */ 147 protected ByteBuffer peekPending() 148 { 149 return outbound.peek(); 150 } 151 152 /** 153 * Called by the SelectorKernel when the top buffer 154 * has been exhausted. 155 */ 156 protected ByteBuffer removePending() 157 { 158 return outbound.poll(); 159 } 160 161 protected boolean hasPending() 162 { 163 return !outbound.isEmpty(); 164 } 165 166 public void send( ByteBuffer data ) 167 { 168 if( data == null ) { 169 throw new IllegalArgumentException( "Data cannot be null." ); 170 } 171 if( closing ) { 172 throw new KernelException( "Endpoint has been closed:" + socket ); 173 } 174 send( data, true, true ); 175 } 176 177 public String toString() 178 { 179 return "NioEndpoint[" + id + ", " + socket + "]"; 180 } 181 } 182