Home | History | Annotate | Download | only in ssh2
      1 /*
      2  * Copyright (c) 2006-2011 Christian Plattner. All rights reserved.
      3  * Please refer to the LICENSE.txt for licensing details.
      4  */
      5 package ch.ethz.ssh2;
      6 
      7 import java.io.IOException;
      8 import java.io.InputStream;
      9 
     10 /**
     11  * A <code>StreamGobbler</code> is an InputStream that uses an internal worker
     12  * thread to constantly consume input from another InputStream. It uses a buffer
     13  * to store the consumed data. The buffer size is automatically adjusted, if needed.
     14  * <p/>
     15  * This class is sometimes very convenient - if you wrap a session's STDOUT and STDERR
     16  * InputStreams with instances of this class, then you don't have to bother about
     17  * the shared window of STDOUT and STDERR in the low level SSH-2 protocol,
     18  * since all arriving data will be immediatelly consumed by the worker threads.
     19  * Also, as a side effect, the streams will be buffered (e.g., single byte
     20  * read() operations are faster).
     21  * <p/>
     22  * Other SSH for Java libraries include this functionality by default in
     23  * their STDOUT and STDERR InputStream implementations, however, please be aware
     24  * that this approach has also a downside:
     25  * <p/>
     26  * If you do not call the StreamGobbler's <code>read()</code> method often enough
     27  * and the peer is constantly sending huge amounts of data, then you will sooner or later
     28  * encounter a low memory situation due to the aggregated data (well, it also depends on the Java heap size).
     29  * Joe Average will like this class anyway - a paranoid programmer would never use such an approach.
     30  * <p/>
     31  * The term "StreamGobbler" was taken from an article called "When Runtime.exec() won't",
     32  * see http://www.javaworld.com/javaworld/jw-12-2000/jw-1229-traps.html.
     33  *
     34  * @author Christian Plattner
     35  * @version 2.50, 03/15/10
     36  */
     37 
     38 public class StreamGobbler extends InputStream
     39 {
     40 	class GobblerThread extends Thread
     41 	{
     42 		@Override
     43 		public void run()
     44 		{
     45 			byte[] buff = new byte[8192];
     46 
     47 			while (true)
     48 			{
     49 				try
     50 				{
     51 					int avail = is.read(buff);
     52 
     53 					synchronized (synchronizer)
     54 					{
     55 						if (avail <= 0)
     56 						{
     57 							isEOF = true;
     58 							synchronizer.notifyAll();
     59 							break;
     60 						}
     61 
     62 						int space_available = buffer.length - write_pos;
     63 
     64 						if (space_available < avail)
     65 						{
     66 							/* compact/resize buffer */
     67 
     68 							int unread_size = write_pos - read_pos;
     69 							int need_space = unread_size + avail;
     70 
     71 							byte[] new_buffer = buffer;
     72 
     73 							if (need_space > buffer.length)
     74 							{
     75 								int inc = need_space / 3;
     76 								inc = (inc < 256) ? 256 : inc;
     77 								inc = (inc > 8192) ? 8192 : inc;
     78 								new_buffer = new byte[need_space + inc];
     79 							}
     80 
     81 							if (unread_size > 0)
     82 								System.arraycopy(buffer, read_pos, new_buffer, 0, unread_size);
     83 
     84 							buffer = new_buffer;
     85 
     86 							read_pos = 0;
     87 							write_pos = unread_size;
     88 						}
     89 
     90 						System.arraycopy(buff, 0, buffer, write_pos, avail);
     91 						write_pos += avail;
     92 
     93 						synchronizer.notifyAll();
     94 					}
     95 				}
     96 				catch (IOException e)
     97 				{
     98 					synchronized (synchronizer)
     99 					{
    100 						exception = e;
    101 						synchronizer.notifyAll();
    102 						break;
    103 					}
    104 				}
    105 			}
    106 		}
    107 	}
    108 
    109 	private InputStream is;
    110 
    111 	private final Object synchronizer = new Object();
    112 
    113 	private boolean isEOF = false;
    114 	private boolean isClosed = false;
    115 	private IOException exception = null;
    116 
    117 	private byte[] buffer = new byte[2048];
    118 	private int read_pos = 0;
    119 	private int write_pos = 0;
    120 
    121 	public StreamGobbler(InputStream is)
    122 	{
    123 		this.is = is;
    124 		GobblerThread t = new GobblerThread();
    125 		t.setDaemon(true);
    126 		t.start();
    127 	}
    128 
    129 	@Override
    130 	public int read() throws IOException
    131 	{
    132 		boolean wasInterrupted = false;
    133 
    134 		try
    135 		{
    136 			synchronized (synchronizer)
    137 			{
    138 				if (isClosed)
    139 					throw new IOException("This StreamGobbler is closed.");
    140 
    141 				while (read_pos == write_pos)
    142 				{
    143 					if (exception != null)
    144 						throw exception;
    145 
    146 					if (isEOF)
    147 						return -1;
    148 
    149 					try
    150 					{
    151 						synchronizer.wait();
    152 					}
    153 					catch (InterruptedException e)
    154 					{
    155 						wasInterrupted = true;
    156 					}
    157 				}
    158 				return buffer[read_pos++] & 0xff;
    159 			}
    160 		}
    161 		finally
    162 		{
    163 			if (wasInterrupted)
    164 				Thread.currentThread().interrupt();
    165 		}
    166 	}
    167 
    168 	@Override
    169 	public int available() throws IOException
    170 	{
    171 		synchronized (synchronizer)
    172 		{
    173 			if (isClosed)
    174 				throw new IOException("This StreamGobbler is closed.");
    175 
    176 			return write_pos - read_pos;
    177 		}
    178 	}
    179 
    180 	@Override
    181 	public int read(byte[] b) throws IOException
    182 	{
    183 		return read(b, 0, b.length);
    184 	}
    185 
    186 	@Override
    187 	public void close() throws IOException
    188 	{
    189 		synchronized (synchronizer)
    190 		{
    191 			if (isClosed)
    192 				return;
    193 			isClosed = true;
    194 			isEOF = true;
    195 			synchronizer.notifyAll();
    196 			is.close();
    197 		}
    198 	}
    199 
    200 	@Override
    201 	public int read(byte[] b, int off, int len) throws IOException
    202 	{
    203 		if (b == null)
    204 			throw new NullPointerException();
    205 
    206 		if ((off < 0) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0) || (off > b.length))
    207 			throw new IndexOutOfBoundsException();
    208 
    209 		if (len == 0)
    210 			return 0;
    211 
    212 		boolean wasInterrupted = false;
    213 
    214 		try
    215 		{
    216 			synchronized (synchronizer)
    217 			{
    218 				if (isClosed)
    219 					throw new IOException("This StreamGobbler is closed.");
    220 
    221 				while (read_pos == write_pos)
    222 				{
    223 					if (exception != null)
    224 						throw exception;
    225 
    226 					if (isEOF)
    227 						return -1;
    228 
    229 					try
    230 					{
    231 						synchronizer.wait();
    232 					}
    233 					catch (InterruptedException e)
    234 					{
    235 						wasInterrupted = true;
    236 					}
    237 				}
    238 
    239 				int avail = write_pos - read_pos;
    240 
    241 				avail = (avail > len) ? len : avail;
    242 
    243 				System.arraycopy(buffer, read_pos, b, off, avail);
    244 
    245 				read_pos += avail;
    246 
    247 				return avail;
    248 			}
    249 		}
    250 		finally
    251 		{
    252 			if (wasInterrupted)
    253 				Thread.currentThread().interrupt();
    254 		}
    255 	}
    256 }
    257