Home | History | Annotate | Download | only in dexfuzz
      1 /*
      2  * Copyright (C) 2014 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package dexfuzz;
     18 
     19 import java.io.BufferedReader;
     20 import java.io.InputStream;
     21 import java.io.InputStreamReader;
     22 import java.io.IOException;
     23 import java.util.ArrayList;
     24 import java.util.List;
     25 import java.util.concurrent.Semaphore;
     26 
     27 /**
     28  * process.waitFor() can block if its output buffers are not drained.
     29  * These threads are used to keep the buffers drained, and provide the final
     30  * output once the command has finished executing. Each Executor has its own
     31  * output and error StreamConsumers.
     32  */
     33 public class StreamConsumer extends Thread {
     34   private List<String> output;
     35   private BufferedReader reader;
     36 
     37   private State state;
     38 
     39   private Semaphore workToBeDone;
     40   private Semaphore outputIsReady;
     41 
     42   enum State {
     43     WAITING,
     44     CONSUMING,
     45     SHOULD_STOP_CONSUMING,
     46     FINISHED,
     47     ERROR
     48   }
     49 
     50   /**
     51    * Create a StreamConsumer, will be immediately ready to start consuming.
     52    */
     53   public StreamConsumer() {
     54     output = new ArrayList<String>();
     55     workToBeDone = new Semaphore(0);
     56     outputIsReady = new Semaphore(0);
     57 
     58     state = State.WAITING;
     59   }
     60 
     61   /**
     62    * Executor should call this to provide its StreamConsumers with the Streams
     63    * for a Process it is about to call waitFor() on.
     64    */
     65   public void giveStreamAndStartConsuming(InputStream stream) {
     66     output.clear();
     67 
     68     reader = new BufferedReader(new InputStreamReader(stream));
     69 
     70     changeState(State.CONSUMING, State.WAITING);
     71 
     72     // Tell consumer there is work to be done.
     73     workToBeDone.release();
     74   }
     75 
     76   /**
     77    * Executor should call this once its call to waitFor() returns.
     78    */
     79   public void processFinished() {
     80     changeState(State.SHOULD_STOP_CONSUMING, State.CONSUMING);
     81   }
     82 
     83   /**
     84    * Executor should call this to get the captured output of this StreamConsumer.
     85    */
     86   public List<String> getOutput() {
     87 
     88     try {
     89       // Wait until the output is ready.
     90       outputIsReady.acquire();
     91     } catch (InterruptedException e) {
     92       Log.error("Client of StreamConsumer was interrupted while waiting for output?");
     93       return null;
     94     }
     95 
     96     // Take a copy of the Strings, so when we call output.clear(), we don't
     97     // clear the ExecutionResult's list.
     98     List<String> copy = new ArrayList<String>(output);
     99     return copy;
    100   }
    101 
    102   /**
    103    * Executor should call this when we're shutting down.
    104    */
    105   public void shutdown() {
    106     changeState(State.FINISHED, State.WAITING);
    107 
    108     // Tell Consumer there is work to be done (it will check first if FINISHED has been set.)
    109     workToBeDone.release();
    110   }
    111 
    112   private void consume() {
    113     try {
    114 
    115       if (checkState(State.SHOULD_STOP_CONSUMING)) {
    116         // Caller already called processFinished() before we even started
    117         // consuming. Just get what we can and finish.
    118         while (reader.ready()) {
    119           output.add(reader.readLine());
    120         }
    121       } else {
    122         // Caller's process is still executing, so just loop and consume.
    123         while (checkState(State.CONSUMING)) {
    124           Thread.sleep(50);
    125           while (reader.ready()) {
    126             output.add(reader.readLine());
    127           }
    128         }
    129       }
    130 
    131       if (checkState(State.SHOULD_STOP_CONSUMING)) {
    132         changeState(State.WAITING, State.SHOULD_STOP_CONSUMING);
    133       } else {
    134         Log.error("StreamConsumer stopped consuming, but was not told to?");
    135         setErrorState();
    136       }
    137 
    138       reader.close();
    139 
    140     } catch (IOException e) {
    141       Log.error("StreamConsumer caught IOException while consuming");
    142       setErrorState();
    143     } catch (InterruptedException e) {
    144       Log.error("StreamConsumer caught InterruptedException while consuming");
    145       setErrorState();
    146     }
    147 
    148     // Tell client of Consumer that the output is ready.
    149     outputIsReady.release();
    150   }
    151 
    152   @Override
    153   public void run() {
    154     while (checkState(State.WAITING)) {
    155       try {
    156         // Wait until there is work to be done
    157         workToBeDone.acquire();
    158       } catch (InterruptedException e) {
    159         Log.error("StreamConsumer caught InterruptedException while waiting for work");
    160         setErrorState();
    161         break;
    162       }
    163 
    164       // Check first if we're done
    165       if (checkState(State.FINISHED)) {
    166         break;
    167       }
    168 
    169       // Make sure we're either supposed to be consuming
    170       // or supposed to be finishing up consuming
    171       if (!(checkState(State.CONSUMING) || checkState(State.SHOULD_STOP_CONSUMING))) {
    172         Log.error("invalid state: StreamConsumer told about work, but not CONSUMING?");
    173         Log.error("state was: " + getCurrentState());
    174         setErrorState();
    175         break;
    176       }
    177 
    178       consume();
    179     }
    180   }
    181 
    182   private synchronized boolean checkState(State expectedState) {
    183     return (expectedState == state);
    184   }
    185 
    186   private synchronized void changeState(State newState, State previousState) {
    187     if (state != previousState) {
    188       Log.error("StreamConsumer Unexpected state: " + state + ", expected " + previousState);
    189       state = State.ERROR;
    190     } else {
    191       state = newState;
    192     }
    193   }
    194 
    195   private synchronized void setErrorState() {
    196     state = State.ERROR;
    197   }
    198 
    199   private synchronized State getCurrentState() {
    200     return state;
    201   }
    202 }
    203