Home | History | Annotate | Download | only in stack
      1 /*
      2 * Conditions Of Use
      3 *
      4 * This software was developed by employees of the National Institute of
      5 * Standards and Technology (NIST), an agency of the Federal Government.
      6 * Pursuant to title 15 Untied States Code Section 105, works of NIST
      7 * employees are not subject to copyright protection in the United States
      8 * and are considered to be in the public domain.  As a result, a formal
      9 * license is not needed to use the software.
     10 *
     11 * This software is provided by NIST as a service and is expressly
     12 * provided "AS IS."  NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED
     13 * OR STATUTORY, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF
     14 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT
     15 * AND DATA ACCURACY.  NIST does not warrant or make any representations
     16 * regarding the use of the software or the results thereof, including but
     17 * not limited to the correctness, accuracy, reliability or usefulness of
     18 * the software.
     19 *
     20 * Permission to use this software is contingent upon your acceptance
     21 * of the terms of this agreement
     22 *
     23 * .
     24 *
     25 */
     26 /*******************************************************************************
     27  *   Product of NIST/ITL Advanced Networking Technologies Division (ANTD).     *
     28  *******************************************************************************/
     29 package gov.nist.javax.sip.stack;
     30 
     31 import java.io.IOException;
     32 import java.util.LinkedList;
     33 import java.net.*;
     34 
     35 import gov.nist.core.*;
     36 
     37 /**
     38  * Sit in a loop and handle incoming udp datagram messages. For each Datagram
     39  * packet, a new UDPMessageChannel is created (upto the max thread pool size).
     40  * Each UDP message is processed in its own thread).
     41  *
     42  * @version 1.2 $Revision: 1.37 $ $Date: 2009/11/14 20:06:16 $
     43  *
     44  * @author M. Ranganathan  <br/>
     45  *
     46  *
     47  *
     48  * <a href="{@docRoot}/../uml/udp-request-processing-sequence-diagram.jpg">
     49  * See the implementation sequence diagram for processing incoming requests.
     50  * </a>
     51  *
     52  *
     53  * Acknowledgement: Jeff Keyser contributed ideas on starting and stoppping the
     54  * stack that were incorporated into this code. Niklas Uhrberg suggested that
     55  * thread pooling be added to limit the number of threads and improve
     56  * performance.
     57  */
     58 public class UDPMessageProcessor extends MessageProcessor {
     59     /**
     60      * The Mapped port (in case STUN suport is enabled)
     61      */
     62     private int port;
     63 
     64     /**
     65      * Incoming messages are queued here.
     66      */
     67     protected LinkedList messageQueue;
     68 
     69     /**
     70      * A list of message channels that we have started.
     71      */
     72     protected LinkedList messageChannels;
     73 
     74     /**
     75      * Max # of udp message channels
     76      */
     77     protected int threadPoolSize;
     78 
     79     protected DatagramSocket sock;
     80 
     81     /**
     82      * A flag that is set to false to exit the message processor (suggestion by
     83      * Jeff Keyser).
     84      */
     85     protected boolean isRunning;
     86 
     87     private static final int HIGHWAT=5000;
     88 
     89     private static final int LOWAT=2500;
     90 
     91     /**
     92      * Constructor.
     93      *
     94      * @param sipStack
     95      *            pointer to the stack.
     96      */
     97     protected UDPMessageProcessor(InetAddress ipAddress,
     98             SIPTransactionStack sipStack, int port) throws IOException {
     99         super(ipAddress, port, "udp",sipStack);
    100 
    101         this.sipStack = sipStack;
    102 
    103         this.messageQueue = new LinkedList();
    104 
    105         this.port = port;
    106         try {
    107             this.sock = sipStack.getNetworkLayer().createDatagramSocket(port,
    108                     ipAddress);
    109             // Create a new datagram socket.
    110             sock.setReceiveBufferSize(sipStack.getReceiveUdpBufferSize());
    111             sock.setSendBufferSize(sipStack.getSendUdpBufferSize());
    112 
    113             /**
    114              * If the thread auditor is enabled, define a socket timeout value in order to
    115              * prevent sock.receive() from blocking forever
    116              */
    117             if (sipStack.getThreadAuditor().isEnabled()) {
    118                 sock.setSoTimeout((int) sipStack.getThreadAuditor().getPingIntervalInMillisecs());
    119             }
    120             if ( ipAddress.getHostAddress().equals(IN_ADDR_ANY)  ||
    121                  ipAddress.getHostAddress().equals(IN6_ADDR_ANY)){
    122                 // Store the address to which we are actually bound
    123                 // Note that on WINDOWS this is actually broken. It will
    124                 // return IN_ADDR_ANY again. On linux it will return the
    125                 // address to which the socket was actually bound.
    126                 super.setIpAddress( sock.getLocalAddress() );
    127 
    128             }
    129         } catch (SocketException ex) {
    130             throw new IOException(ex.getMessage());
    131         }
    132     }
    133 
    134 
    135 
    136     /**
    137      * Get port on which to listen for incoming stuff.
    138      *
    139      * @return port on which I am listening.
    140      */
    141     public int getPort() {
    142         return this.port;
    143     }
    144 
    145     /**
    146      * Start our processor thread.
    147      */
    148     public void start() throws IOException {
    149 
    150 
    151         this.isRunning = true;
    152         Thread thread = new Thread(this);
    153         thread.setDaemon(true);
    154         // Issue #32 on java.net
    155         thread.setName("UDPMessageProcessorThread");
    156         // Issue #184
    157         thread.setPriority(Thread.MAX_PRIORITY);
    158         thread.start();
    159     }
    160 
    161     /**
    162      * Thread main routine.
    163      */
    164     public void run() {
    165         // Check for running flag.
    166         this.messageChannels = new LinkedList();
    167         // start all our messageChannels (unless the thread pool size is
    168         // infinity.
    169         if (sipStack.threadPoolSize != -1) {
    170             for (int i = 0; i < sipStack.threadPoolSize; i++) {
    171                 UDPMessageChannel channel = new UDPMessageChannel(sipStack,
    172                         this);
    173                 this.messageChannels.add(channel);
    174 
    175             }
    176         }
    177 
    178         // Ask the auditor to monitor this thread
    179         ThreadAuditor.ThreadHandle threadHandle = sipStack.getThreadAuditor().addCurrentThread();
    180 
    181         // Somebody asked us to exit. if isRunnning is set to false.
    182         while (this.isRunning) {
    183 
    184             try {
    185                 // Let the thread auditor know we're up and running
    186                 threadHandle.ping();
    187 
    188                 int bufsize = sock.getReceiveBufferSize();
    189                 byte message[] = new byte[bufsize];
    190                 DatagramPacket packet = new DatagramPacket(message, bufsize);
    191                 sock.receive(packet);
    192 
    193 
    194 
    195              // This is a simplistic congestion control algorithm.
    196              // It accepts packets if queuesize is < LOWAT. It drops
    197              // requests if the queue size exceeds a HIGHWAT and accepts
    198              // requests with probability p proportional to the difference
    199              // between current queue size and LOWAT in the range
    200              // of queue sizes between HIGHWAT and LOWAT.
    201              // TODO -- penalize spammers by looking at the source
    202              // port and IP address.
    203              if ( sipStack.stackDoesCongestionControl ) {
    204              if ( this.messageQueue.size() >= HIGHWAT) {
    205                     if (sipStack.isLoggingEnabled()) {
    206                         sipStack.getStackLogger().logDebug("Dropping message -- queue length exceeded");
    207 
    208                     }
    209                     //System.out.println("HIGHWAT Drop!");
    210                     continue;
    211                 } else if ( this.messageQueue.size() > LOWAT && this .messageQueue.size() < HIGHWAT ) {
    212                     // Drop the message with a probabilty that is linear in the range 0 to 1
    213                     float threshold = ((float)(messageQueue.size() - LOWAT))/ ((float)(HIGHWAT - LOWAT));
    214                     boolean decision = Math.random() > 1.0 - threshold;
    215                     if ( decision ) {
    216                         if (sipStack.isLoggingEnabled()) {
    217                             sipStack.getStackLogger().logDebug("Dropping message with probability  " + (1.0 - threshold));
    218 
    219                         }
    220                         //System.out.println("RED Drop!");
    221                         continue;
    222                     }
    223 
    224                 }
    225              }
    226 
    227 
    228 
    229                 // Count of # of packets in process.
    230                 // this.useCount++;
    231                 if (sipStack.threadPoolSize != -1) {
    232                     // Note: the only condition watched for by threads
    233                     // synchronizing on the messageQueue member is that it is
    234                     // not empty. As soon as you introduce some other
    235                     // condition you will have to call notifyAll instead of
    236                     // notify below.
    237 
    238                     synchronized (this.messageQueue) {
    239                         // was addLast
    240                         this.messageQueue.add(packet);
    241                         this.messageQueue.notify();
    242                     }
    243                 } else {
    244                     new UDPMessageChannel(sipStack, this, packet);
    245                 }
    246             } catch (SocketTimeoutException ex) {
    247               // This socket timeout alows us to ping the thread auditor periodically
    248             } catch (SocketException ex) {
    249                 if (sipStack.isLoggingEnabled())
    250                     getSIPStack().getStackLogger()
    251                             .logDebug("UDPMessageProcessor: Stopping");
    252                 isRunning = false;
    253                 // The notifyAll should be in a synchronized block.
    254                 // ( bug report by Niklas Uhrberg ).
    255                 synchronized (this.messageQueue) {
    256                     this.messageQueue.notifyAll();
    257                 }
    258             } catch (IOException ex) {
    259                 isRunning = false;
    260                 ex.printStackTrace();
    261                 if (sipStack.isLoggingEnabled())
    262                     getSIPStack().getStackLogger()
    263                             .logDebug("UDPMessageProcessor: Got an IO Exception");
    264             } catch (Exception ex) {
    265                 if (sipStack.isLoggingEnabled())
    266                     getSIPStack().getStackLogger()
    267                             .logDebug("UDPMessageProcessor: Unexpected Exception - quitting");
    268                 InternalErrorHandler.handleException(ex);
    269                 return;
    270             }
    271         }
    272     }
    273 
    274     /**
    275      * Shut down the message processor. Close the socket for recieving incoming
    276      * messages.
    277      */
    278     public void stop() {
    279         synchronized (this.messageQueue) {
    280             this.isRunning = false;
    281             this.messageQueue.notifyAll();
    282             sock.close();
    283 
    284 
    285         }
    286     }
    287 
    288     /**
    289      * Return the transport string.
    290      *
    291      * @return the transport string
    292      */
    293     public String getTransport() {
    294         return "udp";
    295     }
    296 
    297     /**
    298      * Returns the stack.
    299      *
    300      * @return my sip stack.
    301      */
    302     public SIPTransactionStack getSIPStack() {
    303         return sipStack;
    304     }
    305 
    306     /**
    307      * Create and return new TCPMessageChannel for the given host/port.
    308      */
    309     public MessageChannel createMessageChannel(HostPort targetHostPort)
    310             throws UnknownHostException {
    311         return new UDPMessageChannel(targetHostPort.getInetAddress(),
    312                 targetHostPort.getPort(), sipStack, this);
    313     }
    314 
    315     public MessageChannel createMessageChannel(InetAddress host, int port)
    316             throws IOException {
    317         return new UDPMessageChannel(host, port, sipStack, this);
    318     }
    319 
    320     /**
    321      * Default target port for UDP
    322      */
    323     public int getDefaultTargetPort() {
    324         return 5060;
    325     }
    326 
    327     /**
    328      * UDP is not a secure protocol.
    329      */
    330     public boolean isSecure() {
    331         return false;
    332     }
    333 
    334     /**
    335      * UDP can handle a message as large as the MAX_DATAGRAM_SIZE.
    336      */
    337     public int getMaximumMessageSize() {
    338         return 8*1024;
    339     }
    340 
    341     /**
    342      * Return true if there are any messages in use.
    343      */
    344     public boolean inUse() {
    345         synchronized (messageQueue) {
    346             return messageQueue.size() != 0;
    347         }
    348     }
    349 
    350 }
    351