Home | History | Annotate | Download | only in ssh2
      1 /*
      2  * Copyright (C) 2016 Google Inc.
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
      5  * use this file except in compliance with the License. You may obtain a copy of
      6  * the License at
      7  *
      8  * http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     13  * License for the specific language governing permissions and limitations under
     14  * the License.
     15  */
     16 
     17 package com.trilead.ssh2;
     18 
     19 import com.googlecode.android_scripting.Log;
     20 
     21 import java.io.File;
     22 import java.io.FileOutputStream;
     23 import java.io.IOException;
     24 import java.io.InputStream;
     25 
     26 /**
     27  * A <code>StreamGobbler</code> is an InputStream that uses an internal worker thread to constantly
     28  * consume input from another InputStream. It uses a buffer to store the consumed data. The buffer
     29  * size is automatically adjusted, if needed.
     30  * <p>
     31  * This class is sometimes very convenient - if you wrap a session's STDOUT and STDERR InputStreams
     32  * with instances of this class, then you don't have to bother about the shared window of STDOUT and
     33  * STDERR in the low level SSH-2 protocol, since all arriving data will be immediatelly consumed by
     34  * the worker threads. Also, as a side effect, the streams will be buffered (e.g., single byte
     35  * read() operations are faster).
     36  * <p>
     37  * Other SSH for Java libraries include this functionality by default in their STDOUT and STDERR
     38  * InputStream implementations, however, please be aware that this approach has also a downside:
     39  * <p>
     40  * If you do not call the StreamGobbler's <code>read()</code> method often enough and the peer is
     41  * constantly sending huge amounts of data, then you will sooner or later encounter a low memory
     42  * situation due to the aggregated data (well, it also depends on the Java heap size). Joe Average
     43  * will like this class anyway - a paranoid programmer would never use such an approach.
     44  * <p>
     45  * The term "StreamGobbler" was taken from an article called "When Runtime.exec() won't", see
     46  * http://www.javaworld.com/javaworld/jw-12-2000/jw-1229-traps.html.
     47  *
     48  * @version $Id: StreamGobbler.java,v 1.1 2007/10/15 12:49:56 cplattne Exp $
     49  */
     50 
     51 public class StreamGobbler extends InputStream {
     52   class GobblerThread extends Thread {
     53     @Override
     54     public void run() {
     55 
     56       while (true) {
     57         try {
     58           byte[] saveBuffer = null;
     59 
     60           int avail = is.read(buffer, write_pos, buffer.length - write_pos);
     61 
     62           synchronized (synchronizer) {
     63             if (avail <= 0) {
     64               isEOF = true;
     65               synchronizer.notifyAll();
     66               break;
     67             }
     68             write_pos += avail;
     69 
     70             int space_available = buffer.length - write_pos;
     71 
     72             if (space_available == 0) {
     73               if (read_pos > 0) {
     74                 saveBuffer = new byte[read_pos];
     75                 System.arraycopy(buffer, 0, saveBuffer, 0, read_pos);
     76                 System.arraycopy(buffer, read_pos, buffer, 0, buffer.length - read_pos);
     77                 write_pos -= read_pos;
     78                 read_pos = 0;
     79               } else {
     80                 write_pos = 0;
     81                 saveBuffer = buffer;
     82               }
     83             }
     84 
     85             synchronizer.notifyAll();
     86           }
     87 
     88           writeToFile(saveBuffer);
     89 
     90         } catch (IOException e) {
     91           synchronized (synchronizer) {
     92             exception = e;
     93             synchronizer.notifyAll();
     94             break;
     95           }
     96         }
     97       }
     98     }
     99   }
    100 
    101   private InputStream is;
    102   private GobblerThread t;
    103 
    104   private Object synchronizer = new Object();
    105 
    106   private boolean isEOF = false;
    107   private boolean isClosed = false;
    108   private IOException exception = null;
    109 
    110   private byte[] buffer;
    111   private int read_pos = 0;
    112   private int write_pos = 0;
    113   private final FileOutputStream mLogStream;
    114   private final int mBufferSize;
    115 
    116   public StreamGobbler(InputStream is, File log, int buffer_size) {
    117     this.is = is;
    118     mBufferSize = buffer_size;
    119     FileOutputStream out = null;
    120     try {
    121       out = new FileOutputStream(log, false);
    122     } catch (IOException e) {
    123       Log.e(e);
    124     }
    125     mLogStream = out;
    126     buffer = new byte[mBufferSize];
    127     t = new GobblerThread();
    128     t.setDaemon(true);
    129     t.start();
    130   }
    131 
    132   public void writeToFile(byte[] buffer) {
    133     if (mLogStream != null && buffer != null) {
    134       try {
    135         mLogStream.write(buffer);
    136       } catch (IOException e) {
    137         Log.e(e);
    138       }
    139     }
    140   }
    141 
    142   @Override
    143   public int read() throws IOException {
    144     synchronized (synchronizer) {
    145       if (isClosed) {
    146         throw new IOException("This StreamGobbler is closed.");
    147       }
    148 
    149       while (read_pos == write_pos) {
    150         if (exception != null) {
    151           throw exception;
    152         }
    153 
    154         if (isEOF) {
    155           return -1;
    156         }
    157 
    158         try {
    159           synchronizer.wait();
    160         } catch (InterruptedException e) {
    161         }
    162       }
    163 
    164       int b = buffer[read_pos++] & 0xff;
    165 
    166       return b;
    167     }
    168   }
    169 
    170   @Override
    171   public int available() throws IOException {
    172     synchronized (synchronizer) {
    173       if (isClosed) {
    174         throw new IOException("This StreamGobbler is closed.");
    175       }
    176 
    177       return write_pos - read_pos;
    178     }
    179   }
    180 
    181   @Override
    182   public int read(byte[] b) throws IOException {
    183     return read(b, 0, b.length);
    184   }
    185 
    186   @Override
    187   public void close() throws IOException {
    188     synchronized (synchronizer) {
    189       if (isClosed) {
    190         return;
    191       }
    192       isClosed = true;
    193       isEOF = true;
    194       synchronizer.notifyAll();
    195       is.close();
    196     }
    197   }
    198 
    199   @Override
    200   public int read(byte[] b, int off, int len) throws IOException {
    201     if (b == null) {
    202       throw new NullPointerException();
    203     }
    204 
    205     if ((off < 0) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0) || (off > b.length)) {
    206       throw new IndexOutOfBoundsException();
    207     }
    208 
    209     if (len == 0) {
    210       return 0;
    211     }
    212 
    213     synchronized (synchronizer) {
    214       if (isClosed) {
    215         throw new IOException("This StreamGobbler is closed.");
    216       }
    217 
    218       while (read_pos == write_pos) {
    219         if (exception != null) {
    220           throw exception;
    221         }
    222 
    223         if (isEOF) {
    224           return -1;
    225         }
    226 
    227         try {
    228           synchronizer.wait();
    229         } catch (InterruptedException e) {
    230         }
    231       }
    232 
    233       int avail = write_pos - read_pos;
    234 
    235       avail = (avail > len) ? len : avail;
    236 
    237       System.arraycopy(buffer, read_pos, b, off, avail);
    238 
    239       read_pos += avail;
    240 
    241       return avail;
    242     }
    243   }
    244 }
    245