1 /* 2 * Copyright (c) 2006-2011 Christian Plattner. All rights reserved. 3 * Please refer to the LICENSE.txt for licensing details. 4 */ 5 6 package ch.ethz.ssh2.transport; 7 8 import java.io.IOException; 9 import java.io.InputStream; 10 import java.io.OutputStream; 11 import java.net.InetAddress; 12 import java.net.InetSocketAddress; 13 import java.net.Socket; 14 import java.net.SocketTimeoutException; 15 import java.net.UnknownHostException; 16 import java.security.SecureRandom; 17 import java.util.List; 18 import java.util.Vector; 19 20 import ch.ethz.ssh2.ConnectionInfo; 21 import ch.ethz.ssh2.ConnectionMonitor; 22 import ch.ethz.ssh2.DHGexParameters; 23 import ch.ethz.ssh2.HTTPProxyData; 24 import ch.ethz.ssh2.HTTPProxyException; 25 import ch.ethz.ssh2.ProxyData; 26 import ch.ethz.ssh2.ServerHostKeyVerifier; 27 import ch.ethz.ssh2.crypto.Base64; 28 import ch.ethz.ssh2.crypto.CryptoWishList; 29 import ch.ethz.ssh2.crypto.cipher.BlockCipher; 30 import ch.ethz.ssh2.crypto.digest.MAC; 31 import ch.ethz.ssh2.log.Logger; 32 import ch.ethz.ssh2.packets.PacketDisconnect; 33 import ch.ethz.ssh2.packets.Packets; 34 import ch.ethz.ssh2.packets.TypesReader; 35 import ch.ethz.ssh2.util.StringEncoder; 36 import ch.ethz.ssh2.util.Tokenizer; 37 38 /* 39 * Yes, the "standard" is a big mess. On one side, the say that arbitary channel 40 * packets are allowed during kex exchange, on the other side we need to blindly 41 * ignore the next _packet_ if the KEX guess was wrong. Where do we know from that 42 * the next packet is not a channel data packet? Yes, we could check if it is in 43 * the KEX range. But the standard says nothing about this. The OpenSSH guys 44 * block local "normal" traffic during KEX. That's fine - however, they assume 45 * that the other side is doing the same. During re-key, if they receive traffic 46 * other than KEX, they become horribly irritated and kill the connection. Since 47 * we are very likely going to communicate with OpenSSH servers, we have to play 48 * the same game - even though we could do better. 49 * 50 * btw: having stdout and stderr on the same channel, with a shared window, is 51 * also a VERY good idea... =( 52 */ 53 54 /** 55 * TransportManager. 56 * 57 * @author Christian Plattner 58 * @version $Id: TransportManager.java 41 2011-06-02 10:36:41Z dkocher (at) sudo.ch $ 59 */ 60 public class TransportManager 61 { 62 private static final Logger log = Logger.getLogger(TransportManager.class); 63 64 private static class HandlerEntry 65 { 66 MessageHandler mh; 67 int low; 68 int high; 69 } 70 71 private final List<byte[]> asynchronousQueue = new Vector<byte[]>(); 72 private Thread asynchronousThread = null; 73 74 class AsynchronousWorker extends Thread 75 { 76 @Override 77 public void run() 78 { 79 while (true) 80 { 81 byte[] msg = null; 82 83 synchronized (asynchronousQueue) 84 { 85 if (asynchronousQueue.size() == 0) 86 { 87 /* After the queue is empty for about 2 seconds, stop this thread */ 88 89 try 90 { 91 asynchronousQueue.wait(2000); 92 } 93 catch (InterruptedException ignore) 94 { 95 } 96 97 if (asynchronousQueue.size() == 0) 98 { 99 asynchronousThread = null; 100 return; 101 } 102 } 103 104 msg = asynchronousQueue.remove(0); 105 } 106 107 /* The following invocation may throw an IOException. 108 * There is no point in handling it - it simply means 109 * that the connection has a problem and we should stop 110 * sending asynchronously messages. We do not need to signal that 111 * we have exited (asynchronousThread = null): further 112 * messages in the queue cannot be sent by this or any 113 * other thread. 114 * Other threads will sooner or later (when receiving or 115 * sending the next message) get the same IOException and 116 * get to the same conclusion. 117 */ 118 119 try 120 { 121 sendMessage(msg); 122 } 123 catch (IOException e) 124 { 125 return; 126 } 127 } 128 } 129 } 130 131 private String hostname; 132 private int port; 133 private final Socket sock = new Socket(); 134 135 private final Object connectionSemaphore = new Object(); 136 137 private boolean flagKexOngoing = false; 138 private boolean connectionClosed = false; 139 140 private Throwable reasonClosedCause = null; 141 142 private TransportConnection tc; 143 private KexManager km; 144 145 private final List<HandlerEntry> messageHandlers = new Vector<HandlerEntry>(); 146 147 private Thread receiveThread; 148 149 private List<ConnectionMonitor> connectionMonitors = new Vector<ConnectionMonitor>(); 150 private boolean monitorsWereInformed = false; 151 152 /** 153 * There were reports that there are JDKs which use 154 * the resolver even though one supplies a dotted IP 155 * address in the Socket constructor. That is why we 156 * try to generate the InetAdress "by hand". 157 * 158 * @param host 159 * @return the InetAddress 160 * @throws UnknownHostException 161 */ 162 private InetAddress createInetAddress(String host) throws UnknownHostException 163 { 164 /* Check if it is a dotted IP4 address */ 165 166 InetAddress addr = parseIPv4Address(host); 167 168 if (addr != null) 169 { 170 return addr; 171 } 172 173 return InetAddress.getByName(host); 174 } 175 176 private InetAddress parseIPv4Address(String host) throws UnknownHostException 177 { 178 if (host == null) 179 { 180 return null; 181 } 182 183 String[] quad = Tokenizer.parseTokens(host, '.'); 184 185 if ((quad == null) || (quad.length != 4)) 186 { 187 return null; 188 } 189 190 byte[] addr = new byte[4]; 191 192 for (int i = 0; i < 4; i++) 193 { 194 int part = 0; 195 196 if ((quad[i].length() == 0) || (quad[i].length() > 3)) 197 { 198 return null; 199 } 200 201 for (int k = 0; k < quad[i].length(); k++) 202 { 203 char c = quad[i].charAt(k); 204 205 /* No, Character.isDigit is not the same */ 206 if ((c < '0') || (c > '9')) 207 { 208 return null; 209 } 210 211 part = part * 10 + (c - '0'); 212 } 213 214 if (part > 255) /* 300.1.2.3 is invalid =) */ 215 { 216 return null; 217 } 218 219 addr[i] = (byte) part; 220 } 221 222 return InetAddress.getByAddress(host, addr); 223 } 224 225 public TransportManager(String host, int port) throws IOException 226 { 227 this.hostname = host; 228 this.port = port; 229 } 230 231 public int getPacketOverheadEstimate() 232 { 233 return tc.getPacketOverheadEstimate(); 234 } 235 236 public void setTcpNoDelay(boolean state) throws IOException 237 { 238 sock.setTcpNoDelay(state); 239 } 240 241 public void setSoTimeout(int timeout) throws IOException 242 { 243 sock.setSoTimeout(timeout); 244 } 245 246 public ConnectionInfo getConnectionInfo(int kexNumber) throws IOException 247 { 248 return km.getOrWaitForConnectionInfo(kexNumber); 249 } 250 251 public Throwable getReasonClosedCause() 252 { 253 synchronized (connectionSemaphore) 254 { 255 return reasonClosedCause; 256 } 257 } 258 259 public byte[] getSessionIdentifier() 260 { 261 return km.sessionId; 262 } 263 264 public void close(Throwable cause, boolean useDisconnectPacket) 265 { 266 if (useDisconnectPacket == false) 267 { 268 /* OK, hard shutdown - do not aquire the semaphore, 269 * perhaps somebody is inside (and waits until the remote 270 * side is ready to accept new data). */ 271 272 try 273 { 274 sock.close(); 275 } 276 catch (IOException ignore) 277 { 278 } 279 280 /* OK, whoever tried to send data, should now agree that 281 * there is no point in further waiting =) 282 * It is safe now to aquire the semaphore. 283 */ 284 } 285 286 synchronized (connectionSemaphore) 287 { 288 if (connectionClosed == false) 289 { 290 if (useDisconnectPacket == true) 291 { 292 try 293 { 294 byte[] msg = new PacketDisconnect(Packets.SSH_DISCONNECT_BY_APPLICATION, cause.getMessage(), "") 295 .getPayload(); 296 if (tc != null) 297 { 298 tc.sendMessage(msg); 299 } 300 } 301 catch (IOException ignore) 302 { 303 } 304 305 try 306 { 307 sock.close(); 308 } 309 catch (IOException ignore) 310 { 311 } 312 } 313 314 connectionClosed = true; 315 reasonClosedCause = cause; /* may be null */ 316 } 317 connectionSemaphore.notifyAll(); 318 } 319 320 /* No check if we need to inform the monitors */ 321 322 List<ConnectionMonitor> monitors = new Vector<ConnectionMonitor>(); 323 324 synchronized (this) 325 { 326 /* Short term lock to protect "connectionMonitors" 327 * and "monitorsWereInformed" 328 * (they may be modified concurrently) 329 */ 330 331 if (monitorsWereInformed == false) 332 { 333 monitorsWereInformed = true; 334 monitors.addAll(connectionMonitors); 335 } 336 } 337 338 for (ConnectionMonitor cmon : monitors) 339 { 340 try 341 { 342 cmon.connectionLost(reasonClosedCause); 343 } 344 catch (Exception ignore) 345 { 346 } 347 } 348 } 349 350 private void establishConnection(ProxyData proxyData, int connectTimeout) throws IOException 351 { 352 /* See the comment for createInetAddress() */ 353 354 if (proxyData == null) 355 { 356 InetAddress addr = createInetAddress(hostname); 357 sock.connect(new InetSocketAddress(addr, port), connectTimeout); 358 return; 359 } 360 361 if (proxyData instanceof HTTPProxyData) 362 { 363 HTTPProxyData pd = (HTTPProxyData) proxyData; 364 365 /* At the moment, we only support HTTP proxies */ 366 367 InetAddress addr = createInetAddress(pd.proxyHost); 368 sock.connect(new InetSocketAddress(addr, pd.proxyPort), connectTimeout); 369 370 /* OK, now tell the proxy where we actually want to connect to */ 371 372 StringBuilder sb = new StringBuilder(); 373 374 sb.append("CONNECT "); 375 sb.append(hostname); 376 sb.append(':'); 377 sb.append(port); 378 sb.append(" HTTP/1.0\r\n"); 379 380 if ((pd.proxyUser != null) && (pd.proxyPass != null)) 381 { 382 String credentials = pd.proxyUser + ":" + pd.proxyPass; 383 char[] encoded = Base64.encode(StringEncoder.GetBytes(credentials)); 384 sb.append("Proxy-Authorization: Basic "); 385 sb.append(encoded); 386 sb.append("\r\n"); 387 } 388 389 if (pd.requestHeaderLines != null) 390 { 391 for (int i = 0; i < pd.requestHeaderLines.length; i++) 392 { 393 if (pd.requestHeaderLines[i] != null) 394 { 395 sb.append(pd.requestHeaderLines[i]); 396 sb.append("\r\n"); 397 } 398 } 399 } 400 401 sb.append("\r\n"); 402 403 OutputStream out = sock.getOutputStream(); 404 405 out.write(StringEncoder.GetBytes(sb.toString())); 406 out.flush(); 407 408 /* Now parse the HTTP response */ 409 410 byte[] buffer = new byte[1024]; 411 InputStream in = sock.getInputStream(); 412 413 int len = ClientServerHello.readLineRN(in, buffer); 414 415 String httpReponse = StringEncoder.GetString(buffer, 0, len); 416 417 if (httpReponse.startsWith("HTTP/") == false) 418 { 419 throw new IOException("The proxy did not send back a valid HTTP response."); 420 } 421 422 /* "HTTP/1.X XYZ X" => 14 characters minimum */ 423 424 if ((httpReponse.length() < 14) || (httpReponse.charAt(8) != ' ') || (httpReponse.charAt(12) != ' ')) 425 { 426 throw new IOException("The proxy did not send back a valid HTTP response."); 427 } 428 429 int errorCode = 0; 430 431 try 432 { 433 errorCode = Integer.parseInt(httpReponse.substring(9, 12)); 434 } 435 catch (NumberFormatException ignore) 436 { 437 throw new IOException("The proxy did not send back a valid HTTP response."); 438 } 439 440 if ((errorCode < 0) || (errorCode > 999)) 441 { 442 throw new IOException("The proxy did not send back a valid HTTP response."); 443 } 444 445 if (errorCode != 200) 446 { 447 throw new HTTPProxyException(httpReponse.substring(13), errorCode); 448 } 449 450 /* OK, read until empty line */ 451 452 while (true) 453 { 454 len = ClientServerHello.readLineRN(in, buffer); 455 if (len == 0) 456 { 457 break; 458 } 459 } 460 return; 461 } 462 463 throw new IOException("Unsupported ProxyData"); 464 } 465 466 public void initialize(String identification, CryptoWishList cwl, ServerHostKeyVerifier verifier, 467 DHGexParameters dhgex, int connectTimeout, SecureRandom rnd, ProxyData proxyData) 468 throws IOException 469 { 470 /* First, establish the TCP connection to the SSH-2 server */ 471 472 establishConnection(proxyData, connectTimeout); 473 474 /* Parse the server line and say hello - important: this information is later needed for the 475 * key exchange (to stop man-in-the-middle attacks) - that is why we wrap it into an object 476 * for later use. 477 */ 478 479 ClientServerHello csh = new ClientServerHello(identification, sock.getInputStream(), sock.getOutputStream()); 480 481 tc = new TransportConnection(sock.getInputStream(), sock.getOutputStream(), rnd); 482 483 km = new KexManager(this, csh, cwl, hostname, port, verifier, rnd); 484 km.initiateKEX(cwl, dhgex); 485 486 receiveThread = new Thread(new Runnable() 487 { 488 public void run() 489 { 490 try 491 { 492 receiveLoop(); 493 } 494 catch (IOException e) 495 { 496 close(e, false); 497 498 log.warning("Receive thread: error in receiveLoop: " + e.getMessage()); 499 } 500 501 if (log.isDebugEnabled()) 502 { 503 log.debug("Receive thread: back from receiveLoop"); 504 } 505 506 /* Tell all handlers that it is time to say goodbye */ 507 508 if (km != null) 509 { 510 try 511 { 512 km.handleMessage(null, 0); 513 } 514 catch (IOException ignored) 515 { 516 } 517 } 518 519 for (HandlerEntry he : messageHandlers) 520 { 521 try 522 { 523 he.mh.handleMessage(null, 0); 524 } 525 catch (Exception ignore) 526 { 527 } 528 } 529 } 530 }); 531 532 receiveThread.setDaemon(true); 533 receiveThread.start(); 534 } 535 536 public void registerMessageHandler(MessageHandler mh, int low, int high) 537 { 538 HandlerEntry he = new HandlerEntry(); 539 he.mh = mh; 540 he.low = low; 541 he.high = high; 542 543 synchronized (messageHandlers) 544 { 545 messageHandlers.add(he); 546 } 547 } 548 549 public void removeMessageHandler(MessageHandler mh, int low, int high) 550 { 551 synchronized (messageHandlers) 552 { 553 for (int i = 0; i < messageHandlers.size(); i++) 554 { 555 HandlerEntry he = messageHandlers.get(i); 556 if ((he.mh == mh) && (he.low == low) && (he.high == high)) 557 { 558 messageHandlers.remove(i); 559 break; 560 } 561 } 562 } 563 } 564 565 public void sendKexMessage(byte[] msg) throws IOException 566 { 567 synchronized (connectionSemaphore) 568 { 569 if (connectionClosed) 570 { 571 throw (IOException) new IOException("Sorry, this connection is closed.").initCause(reasonClosedCause); 572 } 573 574 flagKexOngoing = true; 575 576 try 577 { 578 tc.sendMessage(msg); 579 } 580 catch (IOException e) 581 { 582 close(e, false); 583 throw e; 584 } 585 } 586 } 587 588 public void kexFinished() throws IOException 589 { 590 synchronized (connectionSemaphore) 591 { 592 flagKexOngoing = false; 593 connectionSemaphore.notifyAll(); 594 } 595 } 596 597 public void forceKeyExchange(CryptoWishList cwl, DHGexParameters dhgex) throws IOException 598 { 599 km.initiateKEX(cwl, dhgex); 600 } 601 602 public void changeRecvCipher(BlockCipher bc, MAC mac) 603 { 604 tc.changeRecvCipher(bc, mac); 605 } 606 607 public void changeSendCipher(BlockCipher bc, MAC mac) 608 { 609 tc.changeSendCipher(bc, mac); 610 } 611 612 public void sendAsynchronousMessage(byte[] msg) throws IOException 613 { 614 synchronized (asynchronousQueue) 615 { 616 asynchronousQueue.add(msg); 617 618 /* This limit should be flexible enough. We need this, otherwise the peer 619 * can flood us with global requests (and other stuff where we have to reply 620 * with an asynchronous message) and (if the server just sends data and does not 621 * read what we send) this will probably put us in a low memory situation 622 * (our send queue would grow and grow and...) */ 623 624 if (asynchronousQueue.size() > 100) 625 { 626 throw new IOException("Error: the peer is not consuming our asynchronous replies."); 627 } 628 629 /* Check if we have an asynchronous sending thread */ 630 631 if (asynchronousThread == null) 632 { 633 asynchronousThread = new AsynchronousWorker(); 634 asynchronousThread.setDaemon(true); 635 asynchronousThread.start(); 636 637 /* The thread will stop after 2 seconds of inactivity (i.e., empty queue) */ 638 } 639 } 640 } 641 642 public void setConnectionMonitors(List<ConnectionMonitor> monitors) 643 { 644 synchronized (this) 645 { 646 connectionMonitors = new Vector<ConnectionMonitor>(); 647 connectionMonitors.addAll(monitors); 648 } 649 } 650 651 /** 652 * True if no response message expected. 653 */ 654 private boolean idle; 655 656 public void sendMessage(byte[] msg) throws IOException 657 { 658 if (Thread.currentThread() == receiveThread) 659 { 660 throw new IOException("Assertion error: sendMessage may never be invoked by the receiver thread!"); 661 } 662 663 boolean wasInterrupted = false; 664 665 try 666 { 667 synchronized (connectionSemaphore) 668 { 669 while (true) 670 { 671 if (connectionClosed) 672 { 673 throw (IOException) new IOException("Sorry, this connection is closed.") 674 .initCause(reasonClosedCause); 675 } 676 677 if (flagKexOngoing == false) 678 { 679 break; 680 } 681 682 try 683 { 684 connectionSemaphore.wait(); 685 } 686 catch (InterruptedException e) 687 { 688 wasInterrupted = true; 689 } 690 } 691 692 try 693 { 694 tc.sendMessage(msg); 695 idle = false; 696 } 697 catch (IOException e) 698 { 699 close(e, false); 700 throw e; 701 } 702 } 703 } 704 finally 705 { 706 if (wasInterrupted) 707 Thread.currentThread().interrupt(); 708 } 709 } 710 711 public void receiveLoop() throws IOException 712 { 713 byte[] msg = new byte[35000]; 714 715 while (true) 716 { 717 int msglen; 718 try 719 { 720 msglen = tc.receiveMessage(msg, 0, msg.length); 721 } 722 catch (SocketTimeoutException e) 723 { 724 // Timeout in read 725 if (idle) 726 { 727 log.debug("Ignoring socket timeout"); 728 continue; 729 } 730 throw e; 731 } 732 idle = true; 733 734 int type = msg[0] & 0xff; 735 736 if (type == Packets.SSH_MSG_IGNORE) 737 { 738 continue; 739 } 740 741 if (type == Packets.SSH_MSG_DEBUG) 742 { 743 if (log.isDebugEnabled()) 744 { 745 TypesReader tr = new TypesReader(msg, 0, msglen); 746 tr.readByte(); 747 tr.readBoolean(); 748 StringBuilder debugMessageBuffer = new StringBuilder(); 749 debugMessageBuffer.append(tr.readString("UTF-8")); 750 751 for (int i = 0; i < debugMessageBuffer.length(); i++) 752 { 753 char c = debugMessageBuffer.charAt(i); 754 755 if ((c >= 32) && (c <= 126)) 756 { 757 continue; 758 } 759 debugMessageBuffer.setCharAt(i, '\uFFFD'); 760 } 761 762 log.debug("DEBUG Message from remote: '" + debugMessageBuffer.toString() + "'"); 763 } 764 continue; 765 } 766 767 if (type == Packets.SSH_MSG_UNIMPLEMENTED) 768 { 769 throw new IOException("Peer sent UNIMPLEMENTED message, that should not happen."); 770 } 771 772 if (type == Packets.SSH_MSG_DISCONNECT) 773 { 774 TypesReader tr = new TypesReader(msg, 0, msglen); 775 tr.readByte(); 776 int reason_code = tr.readUINT32(); 777 StringBuilder reasonBuffer = new StringBuilder(); 778 reasonBuffer.append(tr.readString("UTF-8")); 779 780 /* 781 * Do not get fooled by servers that send abnormal long error 782 * messages 783 */ 784 785 if (reasonBuffer.length() > 255) 786 { 787 reasonBuffer.setLength(255); 788 reasonBuffer.setCharAt(254, '.'); 789 reasonBuffer.setCharAt(253, '.'); 790 reasonBuffer.setCharAt(252, '.'); 791 } 792 793 /* 794 * Also, check that the server did not send characters that may 795 * screw up the receiver -> restrict to reasonable US-ASCII 796 * subset -> "printable characters" (ASCII 32 - 126). Replace 797 * all others with 0xFFFD (UNICODE replacement character). 798 */ 799 800 for (int i = 0; i < reasonBuffer.length(); i++) 801 { 802 char c = reasonBuffer.charAt(i); 803 804 if ((c >= 32) && (c <= 126)) 805 { 806 continue; 807 } 808 reasonBuffer.setCharAt(i, '\uFFFD'); 809 } 810 811 throw new IOException("Peer sent DISCONNECT message (reason code " + reason_code + "): " 812 + reasonBuffer.toString()); 813 } 814 815 /* 816 * Is it a KEX Packet? 817 */ 818 819 if ((type == Packets.SSH_MSG_KEXINIT) || (type == Packets.SSH_MSG_NEWKEYS) 820 || ((type >= 30) && (type <= 49))) 821 { 822 km.handleMessage(msg, msglen); 823 continue; 824 } 825 826 MessageHandler mh = null; 827 828 for (int i = 0; i < messageHandlers.size(); i++) 829 { 830 HandlerEntry he = messageHandlers.get(i); 831 if ((he.low <= type) && (type <= he.high)) 832 { 833 mh = he.mh; 834 break; 835 } 836 } 837 838 if (mh == null) 839 { 840 throw new IOException("Unexpected SSH message (type " + type + ")"); 841 } 842 843 mh.handleMessage(msg, msglen); 844 } 845 } 846 } 847