Home | History | Annotate | Download | only in base
      1 /*
      2  * Copyright (c) 2011 jMonkeyEngine
      3  * All rights reserved.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are
      7  * met:
      8  *
      9  * * Redistributions of source code must retain the above copyright
     10  *   notice, this list of conditions and the following disclaimer.
     11  *
     12  * * Redistributions in binary form must reproduce the above copyright
     13  *   notice, this list of conditions and the following disclaimer in the
     14  *   documentation and/or other materials provided with the distribution.
     15  *
     16  * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
     17  *   may be used to endorse or promote products derived from this software
     18  *   without specific prior written permission.
     19  *
     20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
     22  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
     23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
     24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
     25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
     27  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
     28  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
     29  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
     30  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     31  */
     32 
     33 package com.jme3.network.base;
     34 
     35 import com.jme3.network.ErrorListener;
     36 import com.jme3.network.Message;
     37 import com.jme3.network.MessageListener;
     38 import com.jme3.network.kernel.Connector;
     39 import com.jme3.network.kernel.ConnectorException;
     40 import java.nio.ByteBuffer;
     41 import java.util.concurrent.ArrayBlockingQueue;
     42 import java.util.concurrent.BlockingQueue;
     43 import java.util.concurrent.atomic.AtomicBoolean;
     44 
     45 /**
     46  *  Wraps a single Connector and forwards new messages
     47  *  to the supplied message dispatcher.  This is used
     48  *  by DefaultClient to manage its connector objects.
     49  *  This is only responsible for message reading and provides
     50  *  no support for buffering writes.
     51  *
     52  *  <p>This adapter assumes a simple protocol where two
     53  *  bytes define a (short) object size with the object data
     54  *  to follow.  Note: this limits the size of serialized
     55  *  objects to 32676 bytes... even though, for example,
     56  *  datagram packets can hold twice that. :P</p>
     57  *
     58  *  @version   $Revision: 8944 $
     59  *  @author    Paul Speed
     60  */
     61 public class ConnectorAdapter extends Thread
     62 {
     63     private static final int OUTBOUND_BACKLOG = 16000;
     64 
     65     private Connector connector;
     66     private MessageListener<Object> dispatcher;
     67     private ErrorListener<Object> errorHandler;
     68     private AtomicBoolean go = new AtomicBoolean(true);
     69 
     70     private BlockingQueue<ByteBuffer> outbound;
     71 
     72     // Writes messages out on a background thread
     73     private WriterThread writer;
     74 
     75     // Marks the messages as reliable or not if they came
     76     // through this connector.
     77     private boolean reliable;
     78 
     79     public ConnectorAdapter( Connector connector, MessageListener<Object> dispatcher,
     80                              ErrorListener<Object> errorHandler, boolean reliable )
     81     {
     82         super( String.valueOf(connector) );
     83         this.connector = connector;
     84         this.dispatcher = dispatcher;
     85         this.errorHandler = errorHandler;
     86         this.reliable = reliable;
     87         setDaemon(true);
     88 
     89         // The backlog makes sure that the outbound channel blocks once
     90         // a certain backlog level is reached.  It is set high so that it
     91         // is only reached in the worst cases... which are usually things like
     92         // raw throughput tests.  Technically, a saturated TCP channel could
     93         // back up quite a bit if the buffers are full and the socket has
     94         // stalled but 16,000 messages is still a big backlog.
     95         outbound = new ArrayBlockingQueue<ByteBuffer>(OUTBOUND_BACKLOG);
     96 
     97         // Note: this technically adds a potential deadlock case
     98         // with the above code where there wasn't one before.  For example,
     99         // if a TCP outbound queue fills to capacity and a client sends
    100         // in such a way that they block TCP message handling then if the HostedConnection
    101         // on the server is similarly blocked then the TCP network buffers may
    102         // all get full and no outbound messages move and we forever block
    103         // on the queue.
    104         // However, in practice this can't really happen... or at least it's
    105         // the sign of other really bad things.
    106         // First, currently the server-side outbound queues are all unbounded and
    107         // so won't ever block the handling of messages if the outbound channel is full.
    108         // Second, there would have to be a huge amount of data backlog for this
    109         // to ever occur anyway.
    110         // Third, it's a sign of a really poor architecture if 16,000 messages
    111         // can go out in a way that blocks reads.
    112 
    113         writer = new WriterThread();
    114         writer.start();
    115     }
    116 
    117     public void close()
    118     {
    119         go.set(false);
    120 
    121         // Kill the writer service
    122         writer.shutdown();
    123 
    124         if( connector.isConnected() )
    125             {
    126             // Kill the connector
    127             connector.close();
    128             }
    129     }
    130 
    131     protected void dispatch( Message m )
    132     {
    133         dispatcher.messageReceived( null, m );
    134     }
    135 
    136     public void write( ByteBuffer data )
    137     {
    138         try {
    139             outbound.put( data );
    140         } catch( InterruptedException e ) {
    141             throw new RuntimeException( "Interrupted while waiting for queue to drain", e );
    142         }
    143     }
    144 
    145     protected void handleError( Exception e )
    146     {
    147         if( !go.get() )
    148             return;
    149 
    150         errorHandler.handleError( this, e );
    151     }
    152 
    153     public void run()
    154     {
    155         MessageProtocol protocol = new MessageProtocol();
    156 
    157         try {
    158             while( go.get() ) {
    159                 ByteBuffer buffer = connector.read();
    160                 if( buffer == null ) {
    161                     if( go.get() ) {
    162                         throw new ConnectorException( "Connector closed." );
    163                     } else {
    164                         // Just dump out because a null buffer is expected
    165                         // from a closed/closing connector
    166                         break;
    167                     }
    168                 }
    169 
    170                 protocol.addBuffer( buffer );
    171 
    172                 Message m = null;
    173                 while( (m = protocol.getMessage()) != null ) {
    174                     m.setReliable( reliable );
    175                     dispatch( m );
    176                 }
    177             }
    178         } catch( Exception e ) {
    179             handleError( e );
    180         }
    181     }
    182 
    183     protected class WriterThread extends Thread
    184     {
    185         public WriterThread()
    186         {
    187             super( String.valueOf(connector) + "-writer" );
    188         }
    189 
    190         public void shutdown()
    191         {
    192             interrupt();
    193         }
    194 
    195         private void write( ByteBuffer data )
    196         {
    197             try {
    198                 connector.write(data);
    199             } catch( Exception e ) {
    200                 handleError( e );
    201             }
    202         }
    203 
    204         public void run()
    205         {
    206             while( go.get() ) {
    207                 try {
    208                     ByteBuffer data = outbound.take();
    209                     write(data);
    210                 } catch( InterruptedException e ) {
    211                     if( !go.get() )
    212                         return;
    213                     throw new RuntimeException( "Interrupted waiting for data", e );
    214                 }
    215             }
    216         }
    217     }
    218 }
    219