1 /* 2 * Conditions Of Use 3 * 4 * This software was developed by employees of the National Institute of 5 * Standards and Technology (NIST), an agency of the Federal Government. 6 * Pursuant to title 15 Untied States Code Section 105, works of NIST 7 * employees are not subject to copyright protection in the United States 8 * and are considered to be in the public domain. As a result, a formal 9 * license is not needed to use the software. 10 * 11 * This software is provided by NIST as a service and is expressly 12 * provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED 13 * OR STATUTORY, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF 14 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT 15 * AND DATA ACCURACY. NIST does not warrant or make any representations 16 * regarding the use of the software or the results thereof, including but 17 * not limited to the correctness, accuracy, reliability or usefulness of 18 * the software. 19 * 20 * Permission to use this software is contingent upon your acceptance 21 * of the terms of this agreement 22 * 23 * . 24 * 25 */ 26 /******************************************************************************* 27 * Product of NIST/ITL Advanced Networking Technologies Division (ANTD). * 28 *******************************************************************************/ 29 package gov.nist.javax.sip.stack; 30 31 import java.io.IOException; 32 import java.util.LinkedList; 33 import java.net.*; 34 35 import gov.nist.core.*; 36 37 /** 38 * Sit in a loop and handle incoming udp datagram messages. For each Datagram 39 * packet, a new UDPMessageChannel is created (upto the max thread pool size). 40 * Each UDP message is processed in its own thread). 41 * 42 * @version 1.2 $Revision: 1.37 $ $Date: 2009/11/14 20:06:16 $ 43 * 44 * @author M. Ranganathan <br/> 45 * 46 * 47 * 48 * <a href="{@docRoot}/../uml/udp-request-processing-sequence-diagram.jpg"> 49 * See the implementation sequence diagram for processing incoming requests. 50 * </a> 51 * 52 * 53 * Acknowledgement: Jeff Keyser contributed ideas on starting and stoppping the 54 * stack that were incorporated into this code. Niklas Uhrberg suggested that 55 * thread pooling be added to limit the number of threads and improve 56 * performance. 57 */ 58 public class UDPMessageProcessor extends MessageProcessor { 59 /** 60 * The Mapped port (in case STUN suport is enabled) 61 */ 62 private int port; 63 64 /** 65 * Incoming messages are queued here. 66 */ 67 protected LinkedList messageQueue; 68 69 /** 70 * A list of message channels that we have started. 71 */ 72 protected LinkedList messageChannels; 73 74 /** 75 * Max # of udp message channels 76 */ 77 protected int threadPoolSize; 78 79 protected DatagramSocket sock; 80 81 /** 82 * A flag that is set to false to exit the message processor (suggestion by 83 * Jeff Keyser). 84 */ 85 protected boolean isRunning; 86 87 private static final int HIGHWAT=5000; 88 89 private static final int LOWAT=2500; 90 91 /** 92 * Constructor. 93 * 94 * @param sipStack 95 * pointer to the stack. 96 */ 97 protected UDPMessageProcessor(InetAddress ipAddress, 98 SIPTransactionStack sipStack, int port) throws IOException { 99 super(ipAddress, port, "udp",sipStack); 100 101 this.sipStack = sipStack; 102 103 this.messageQueue = new LinkedList(); 104 105 this.port = port; 106 try { 107 this.sock = sipStack.getNetworkLayer().createDatagramSocket(port, 108 ipAddress); 109 // Create a new datagram socket. 110 sock.setReceiveBufferSize(sipStack.getReceiveUdpBufferSize()); 111 sock.setSendBufferSize(sipStack.getSendUdpBufferSize()); 112 113 /** 114 * If the thread auditor is enabled, define a socket timeout value in order to 115 * prevent sock.receive() from blocking forever 116 */ 117 if (sipStack.getThreadAuditor().isEnabled()) { 118 sock.setSoTimeout((int) sipStack.getThreadAuditor().getPingIntervalInMillisecs()); 119 } 120 if ( ipAddress.getHostAddress().equals(IN_ADDR_ANY) || 121 ipAddress.getHostAddress().equals(IN6_ADDR_ANY)){ 122 // Store the address to which we are actually bound 123 // Note that on WINDOWS this is actually broken. It will 124 // return IN_ADDR_ANY again. On linux it will return the 125 // address to which the socket was actually bound. 126 super.setIpAddress( sock.getLocalAddress() ); 127 128 } 129 } catch (SocketException ex) { 130 throw new IOException(ex.getMessage()); 131 } 132 } 133 134 135 136 /** 137 * Get port on which to listen for incoming stuff. 138 * 139 * @return port on which I am listening. 140 */ 141 public int getPort() { 142 return this.port; 143 } 144 145 /** 146 * Start our processor thread. 147 */ 148 public void start() throws IOException { 149 150 151 this.isRunning = true; 152 Thread thread = new Thread(this); 153 thread.setDaemon(true); 154 // Issue #32 on java.net 155 thread.setName("UDPMessageProcessorThread"); 156 // Issue #184 157 thread.setPriority(Thread.MAX_PRIORITY); 158 thread.start(); 159 } 160 161 /** 162 * Thread main routine. 163 */ 164 public void run() { 165 // Check for running flag. 166 this.messageChannels = new LinkedList(); 167 // start all our messageChannels (unless the thread pool size is 168 // infinity. 169 if (sipStack.threadPoolSize != -1) { 170 for (int i = 0; i < sipStack.threadPoolSize; i++) { 171 UDPMessageChannel channel = new UDPMessageChannel(sipStack, 172 this); 173 this.messageChannels.add(channel); 174 175 } 176 } 177 178 // Ask the auditor to monitor this thread 179 ThreadAuditor.ThreadHandle threadHandle = sipStack.getThreadAuditor().addCurrentThread(); 180 181 // Somebody asked us to exit. if isRunnning is set to false. 182 while (this.isRunning) { 183 184 try { 185 // Let the thread auditor know we're up and running 186 threadHandle.ping(); 187 188 int bufsize = sock.getReceiveBufferSize(); 189 byte message[] = new byte[bufsize]; 190 DatagramPacket packet = new DatagramPacket(message, bufsize); 191 sock.receive(packet); 192 193 194 195 // This is a simplistic congestion control algorithm. 196 // It accepts packets if queuesize is < LOWAT. It drops 197 // requests if the queue size exceeds a HIGHWAT and accepts 198 // requests with probability p proportional to the difference 199 // between current queue size and LOWAT in the range 200 // of queue sizes between HIGHWAT and LOWAT. 201 // TODO -- penalize spammers by looking at the source 202 // port and IP address. 203 if ( sipStack.stackDoesCongestionControl ) { 204 if ( this.messageQueue.size() >= HIGHWAT) { 205 if (sipStack.isLoggingEnabled()) { 206 sipStack.getStackLogger().logDebug("Dropping message -- queue length exceeded"); 207 208 } 209 //System.out.println("HIGHWAT Drop!"); 210 continue; 211 } else if ( this.messageQueue.size() > LOWAT && this .messageQueue.size() < HIGHWAT ) { 212 // Drop the message with a probabilty that is linear in the range 0 to 1 213 float threshold = ((float)(messageQueue.size() - LOWAT))/ ((float)(HIGHWAT - LOWAT)); 214 boolean decision = Math.random() > 1.0 - threshold; 215 if ( decision ) { 216 if (sipStack.isLoggingEnabled()) { 217 sipStack.getStackLogger().logDebug("Dropping message with probability " + (1.0 - threshold)); 218 219 } 220 //System.out.println("RED Drop!"); 221 continue; 222 } 223 224 } 225 } 226 227 228 229 // Count of # of packets in process. 230 // this.useCount++; 231 if (sipStack.threadPoolSize != -1) { 232 // Note: the only condition watched for by threads 233 // synchronizing on the messageQueue member is that it is 234 // not empty. As soon as you introduce some other 235 // condition you will have to call notifyAll instead of 236 // notify below. 237 238 synchronized (this.messageQueue) { 239 // was addLast 240 this.messageQueue.add(packet); 241 this.messageQueue.notify(); 242 } 243 } else { 244 new UDPMessageChannel(sipStack, this, packet); 245 } 246 } catch (SocketTimeoutException ex) { 247 // This socket timeout alows us to ping the thread auditor periodically 248 } catch (SocketException ex) { 249 if (sipStack.isLoggingEnabled()) 250 getSIPStack().getStackLogger() 251 .logDebug("UDPMessageProcessor: Stopping"); 252 isRunning = false; 253 // The notifyAll should be in a synchronized block. 254 // ( bug report by Niklas Uhrberg ). 255 synchronized (this.messageQueue) { 256 this.messageQueue.notifyAll(); 257 } 258 } catch (IOException ex) { 259 isRunning = false; 260 ex.printStackTrace(); 261 if (sipStack.isLoggingEnabled()) 262 getSIPStack().getStackLogger() 263 .logDebug("UDPMessageProcessor: Got an IO Exception"); 264 } catch (Exception ex) { 265 if (sipStack.isLoggingEnabled()) 266 getSIPStack().getStackLogger() 267 .logDebug("UDPMessageProcessor: Unexpected Exception - quitting"); 268 InternalErrorHandler.handleException(ex); 269 return; 270 } 271 } 272 } 273 274 /** 275 * Shut down the message processor. Close the socket for recieving incoming 276 * messages. 277 */ 278 public void stop() { 279 synchronized (this.messageQueue) { 280 this.isRunning = false; 281 this.messageQueue.notifyAll(); 282 sock.close(); 283 284 285 } 286 } 287 288 /** 289 * Return the transport string. 290 * 291 * @return the transport string 292 */ 293 public String getTransport() { 294 return "udp"; 295 } 296 297 /** 298 * Returns the stack. 299 * 300 * @return my sip stack. 301 */ 302 public SIPTransactionStack getSIPStack() { 303 return sipStack; 304 } 305 306 /** 307 * Create and return new TCPMessageChannel for the given host/port. 308 */ 309 public MessageChannel createMessageChannel(HostPort targetHostPort) 310 throws UnknownHostException { 311 return new UDPMessageChannel(targetHostPort.getInetAddress(), 312 targetHostPort.getPort(), sipStack, this); 313 } 314 315 public MessageChannel createMessageChannel(InetAddress host, int port) 316 throws IOException { 317 return new UDPMessageChannel(host, port, sipStack, this); 318 } 319 320 /** 321 * Default target port for UDP 322 */ 323 public int getDefaultTargetPort() { 324 return 5060; 325 } 326 327 /** 328 * UDP is not a secure protocol. 329 */ 330 public boolean isSecure() { 331 return false; 332 } 333 334 /** 335 * UDP can handle a message as large as the MAX_DATAGRAM_SIZE. 336 */ 337 public int getMaximumMessageSize() { 338 return 8*1024; 339 } 340 341 /** 342 * Return true if there are any messages in use. 343 */ 344 public boolean inUse() { 345 synchronized (messageQueue) { 346 return messageQueue.size() != 0; 347 } 348 } 349 350 } 351