Home | History | Annotate | Download | only in http
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package coretestutils.http;
     18 
     19 import java.io.BufferedInputStream;
     20 import java.io.BufferedOutputStream;
     21 import java.io.ByteArrayOutputStream;
     22 import java.io.File;
     23 import java.io.IOException;
     24 import java.io.InputStream;
     25 import java.io.OutputStream;
     26 import java.net.MalformedURLException;
     27 import java.net.ServerSocket;
     28 import java.net.Socket;
     29 import java.net.URL;
     30 import java.util.ArrayList;
     31 import java.util.LinkedList;
     32 import java.util.List;
     33 import java.util.Queue;
     34 import java.util.concurrent.BlockingQueue;
     35 import java.util.concurrent.Callable;
     36 import java.util.concurrent.ExecutionException;
     37 import java.util.concurrent.ExecutorService;
     38 import java.util.concurrent.Executors;
     39 import java.util.concurrent.Future;
     40 import java.util.concurrent.LinkedBlockingQueue;
     41 import java.util.concurrent.TimeUnit;
     42 import java.util.concurrent.TimeoutException;
     43 
     44 import android.util.Log;
     45 
     46 /**
     47  * A scriptable web server. Callers supply canned responses and the server
     48  * replays them upon request in sequence.
     49  *
     50  * TODO: merge with the version from libcore/support/src/tests/java once it's in.
     51  */
     52 public final class MockWebServer {
     53     static final String ASCII = "US-ASCII";
     54     static final String LOG_TAG = "coretestutils.http.MockWebServer";
     55 
     56     private final BlockingQueue<RecordedRequest> requestQueue
     57             = new LinkedBlockingQueue<RecordedRequest>();
     58     private final BlockingQueue<MockResponse> responseQueue
     59             = new LinkedBlockingQueue<MockResponse>();
     60     private int bodyLimit = Integer.MAX_VALUE;
     61     private final ExecutorService executor = Executors.newCachedThreadPool();
     62     // keep Futures around so we can rethrow any exceptions thrown by Callables
     63     private final Queue<Future<?>> futures = new LinkedList<Future<?>>();
     64     private final Object downloadPauseLock = new Object();
     65     // global flag to signal when downloads should resume on the server
     66     private volatile boolean downloadResume = false;
     67 
     68     private int port = -1;
     69 
     70     public int getPort() {
     71         if (port == -1) {
     72             throw new IllegalStateException("Cannot retrieve port before calling play()");
     73         }
     74         return port;
     75     }
     76 
     77     /**
     78      * Returns a URL for connecting to this server.
     79      *
     80      * @param path the request path, such as "/".
     81      */
     82     public URL getUrl(String path) throws MalformedURLException {
     83         return new URL("http://localhost:" + getPort() + path);
     84     }
     85 
     86     /**
     87      * Sets the number of bytes of the POST body to keep in memory to the given
     88      * limit.
     89      */
     90     public void setBodyLimit(int maxBodyLength) {
     91         this.bodyLimit = maxBodyLength;
     92     }
     93 
     94     public void enqueue(MockResponse response) {
     95         responseQueue.add(response);
     96     }
     97 
     98     /**
     99      * Awaits the next HTTP request, removes it, and returns it. Callers should
    100      * use this to verify the request sent was as intended.
    101      */
    102     public RecordedRequest takeRequest() throws InterruptedException {
    103         return requestQueue.take();
    104     }
    105 
    106     public RecordedRequest takeRequestWithTimeout(long timeoutMillis) throws InterruptedException {
    107         return requestQueue.poll(timeoutMillis, TimeUnit.MILLISECONDS);
    108     }
    109 
    110     public List<RecordedRequest> drainRequests() {
    111         List<RecordedRequest> requests = new ArrayList<RecordedRequest>();
    112         requestQueue.drainTo(requests);
    113         return requests;
    114     }
    115 
    116     /**
    117      * Starts the server, serves all enqueued requests, and shuts the server
    118      * down using the default (server-assigned) port.
    119      */
    120     public void play() throws IOException {
    121         play(0);
    122     }
    123 
    124     /**
    125      * Starts the server, serves all enqueued requests, and shuts the server
    126      * down.
    127      *
    128      * @param port The port number to use to listen to connections on; pass in 0 to have the
    129      * server automatically assign a free port
    130      */
    131     public void play(int portNumber) throws IOException {
    132         final ServerSocket ss = new ServerSocket(portNumber);
    133         ss.setReuseAddress(true);
    134         port = ss.getLocalPort();
    135         submitCallable(new Callable<Void>() {
    136             public Void call() throws Exception {
    137                 int count = 0;
    138                 while (true) {
    139                     if (count > 0 && responseQueue.isEmpty()) {
    140                         ss.close();
    141                         executor.shutdown();
    142                         return null;
    143                     }
    144 
    145                     serveConnection(ss.accept());
    146                     count++;
    147                 }
    148             }
    149         });
    150     }
    151 
    152     private void serveConnection(final Socket s) {
    153         submitCallable(new Callable<Void>() {
    154             public Void call() throws Exception {
    155                 InputStream in = new BufferedInputStream(s.getInputStream());
    156                 OutputStream out = new BufferedOutputStream(s.getOutputStream());
    157 
    158                 int sequenceNumber = 0;
    159                 while (true) {
    160                     RecordedRequest request = readRequest(in, sequenceNumber);
    161                     if (request == null) {
    162                         if (sequenceNumber == 0) {
    163                             throw new IllegalStateException("Connection without any request!");
    164                         } else {
    165                             break;
    166                         }
    167                     }
    168                     requestQueue.add(request);
    169                     MockResponse response = computeResponse(request);
    170                     writeResponse(out, response);
    171                     if (response.shouldCloseConnectionAfter()) {
    172                         break;
    173                     }
    174                     sequenceNumber++;
    175                 }
    176 
    177                 in.close();
    178                 out.close();
    179                 return null;
    180             }
    181         });
    182     }
    183 
    184     private void submitCallable(Callable<?> callable) {
    185         Future<?> future = executor.submit(callable);
    186         futures.add(future);
    187     }
    188 
    189     /**
    190      * Check for and raise any exceptions that have been thrown by child threads.  Will not block on
    191      * children still running.
    192      * @throws ExecutionException for the first child thread that threw an exception
    193      */
    194     public void checkForExceptions() throws ExecutionException, InterruptedException {
    195         final int originalSize = futures.size();
    196         for (int i = 0; i < originalSize; i++) {
    197             Future<?> future = futures.remove();
    198             try {
    199                 future.get(0, TimeUnit.SECONDS);
    200             } catch (TimeoutException e) {
    201                 futures.add(future); // still running
    202             }
    203         }
    204     }
    205 
    206     /**
    207      * @param sequenceNumber the index of this request on this connection.
    208      */
    209     private RecordedRequest readRequest(InputStream in, int sequenceNumber) throws IOException {
    210         String request = readAsciiUntilCrlf(in);
    211         if (request.equals("")) {
    212             return null; // end of data; no more requests
    213         }
    214 
    215         List<String> headers = new ArrayList<String>();
    216         int contentLength = -1;
    217         boolean chunked = false;
    218         String header;
    219         while (!(header = readAsciiUntilCrlf(in)).equals("")) {
    220             headers.add(header);
    221             String lowercaseHeader = header.toLowerCase();
    222             if (contentLength == -1 && lowercaseHeader.startsWith("content-length:")) {
    223                 contentLength = Integer.parseInt(header.substring(15).trim());
    224             }
    225             if (lowercaseHeader.startsWith("transfer-encoding:") &&
    226                     lowercaseHeader.substring(18).trim().equals("chunked")) {
    227                 chunked = true;
    228             }
    229         }
    230 
    231         boolean hasBody = false;
    232         TruncatingOutputStream requestBody = new TruncatingOutputStream();
    233         List<Integer> chunkSizes = new ArrayList<Integer>();
    234         if (contentLength != -1) {
    235             hasBody = true;
    236             transfer(contentLength, in, requestBody);
    237         } else if (chunked) {
    238             hasBody = true;
    239             while (true) {
    240                 int chunkSize = Integer.parseInt(readAsciiUntilCrlf(in).trim(), 16);
    241                 if (chunkSize == 0) {
    242                     readEmptyLine(in);
    243                     break;
    244                 }
    245                 chunkSizes.add(chunkSize);
    246                 transfer(chunkSize, in, requestBody);
    247                 readEmptyLine(in);
    248             }
    249         }
    250 
    251         if (request.startsWith("GET ")) {
    252             if (hasBody) {
    253                 throw new IllegalArgumentException("GET requests should not have a body!");
    254             }
    255         } else if (request.startsWith("POST ")) {
    256             if (!hasBody) {
    257                 throw new IllegalArgumentException("POST requests must have a body!");
    258             }
    259         } else {
    260             throw new UnsupportedOperationException("Unexpected method: " + request);
    261         }
    262 
    263         return new RecordedRequest(request, headers, chunkSizes,
    264                 requestBody.numBytesReceived, requestBody.toByteArray(), sequenceNumber);
    265     }
    266 
    267     /**
    268      * Returns a response to satisfy {@code request}.
    269      */
    270     private MockResponse computeResponse(RecordedRequest request) throws InterruptedException {
    271         if (responseQueue.isEmpty()) {
    272             throw new IllegalStateException("Unexpected request: " + request);
    273         }
    274         return responseQueue.take();
    275     }
    276 
    277     private void writeResponse(OutputStream out, MockResponse response) throws IOException {
    278         out.write((response.getStatus() + "\r\n").getBytes(ASCII));
    279         boolean doCloseConnectionAfterHeader = (response.getCloseConnectionAfterHeader() != null);
    280 
    281         // Send headers
    282         String closeConnectionAfterHeader = response.getCloseConnectionAfterHeader();
    283         for (String header : response.getHeaders()) {
    284             out.write((header + "\r\n").getBytes(ASCII));
    285 
    286             if (doCloseConnectionAfterHeader && header.startsWith(closeConnectionAfterHeader)) {
    287                 Log.i(LOG_TAG, "Closing connection after header" + header);
    288                 break;
    289             }
    290         }
    291 
    292         // Send actual body data
    293         if (!doCloseConnectionAfterHeader) {
    294             out.write(("\r\n").getBytes(ASCII));
    295 
    296             InputStream body = response.getBody();
    297             final int READ_BLOCK_SIZE = 10000;  // process blocks this size
    298             byte[] currentBlock = new byte[READ_BLOCK_SIZE];
    299             int currentBlockSize = 0;
    300             int writtenSoFar = 0;
    301 
    302             boolean shouldPause = response.getShouldPause();
    303             boolean shouldClose = response.getShouldClose();
    304             int pause = response.getPauseConnectionAfterXBytes();
    305             int close = response.getCloseConnectionAfterXBytes();
    306 
    307             // Don't bother pausing if it's set to pause -after- the connection should be dropped
    308             if (shouldPause && shouldClose && (pause > close)) {
    309                 shouldPause = false;
    310             }
    311 
    312             // Process each block we read in...
    313             while ((currentBlockSize = body.read(currentBlock)) != -1) {
    314                 int startIndex = 0;
    315                 int writeLength = currentBlockSize;
    316 
    317                 // handle the case of pausing
    318                 if (shouldPause && (writtenSoFar + currentBlockSize >= pause)) {
    319                     writeLength = pause - writtenSoFar;
    320                     out.write(currentBlock, 0, writeLength);
    321                     out.flush();
    322                     writtenSoFar += writeLength;
    323 
    324                     // now pause...
    325                     try {
    326                         Log.i(LOG_TAG, "Pausing connection after " + pause + " bytes");
    327                         // Wait until someone tells us to resume sending...
    328                         synchronized(downloadPauseLock) {
    329                             while (!downloadResume) {
    330                                 downloadPauseLock.wait();
    331                             }
    332                             // reset resume back to false
    333                             downloadResume = false;
    334                         }
    335                     } catch (InterruptedException e) {
    336                         Log.e(LOG_TAG, "Server was interrupted during pause in download.");
    337                     }
    338 
    339                     startIndex = writeLength;
    340                     writeLength = currentBlockSize - writeLength;
    341                 }
    342 
    343                 // handle the case of closing the connection
    344                 if (shouldClose && (writtenSoFar + writeLength > close)) {
    345                     writeLength = close - writtenSoFar;
    346                     out.write(currentBlock, startIndex, writeLength);
    347                     writtenSoFar += writeLength;
    348                     Log.i(LOG_TAG, "Closing connection after " + close + " bytes");
    349                     break;
    350                 }
    351                 out.write(currentBlock, startIndex, writeLength);
    352                 writtenSoFar += writeLength;
    353             }
    354         }
    355         out.flush();
    356     }
    357 
    358     /**
    359      * Transfer bytes from {@code in} to {@code out} until either {@code length}
    360      * bytes have been transferred or {@code in} is exhausted.
    361      */
    362     private void transfer(int length, InputStream in, OutputStream out) throws IOException {
    363         byte[] buffer = new byte[1024];
    364         while (length > 0) {
    365             int count = in.read(buffer, 0, Math.min(buffer.length, length));
    366             if (count == -1) {
    367                 return;
    368             }
    369             out.write(buffer, 0, count);
    370             length -= count;
    371         }
    372     }
    373 
    374     /**
    375      * Returns the text from {@code in} until the next "\r\n", or null if
    376      * {@code in} is exhausted.
    377      */
    378     private String readAsciiUntilCrlf(InputStream in) throws IOException {
    379         StringBuilder builder = new StringBuilder();
    380         while (true) {
    381             int c = in.read();
    382             if (c == '\n' && builder.length() > 0 && builder.charAt(builder.length() - 1) == '\r') {
    383                 builder.deleteCharAt(builder.length() - 1);
    384                 return builder.toString();
    385             } else if (c == -1) {
    386                 return builder.toString();
    387             } else {
    388                 builder.append((char) c);
    389             }
    390         }
    391     }
    392 
    393     private void readEmptyLine(InputStream in) throws IOException {
    394         String line = readAsciiUntilCrlf(in);
    395         if (!line.equals("")) {
    396             throw new IllegalStateException("Expected empty but was: " + line);
    397         }
    398     }
    399 
    400     /**
    401      * An output stream that drops data after bodyLimit bytes.
    402      */
    403     private class TruncatingOutputStream extends ByteArrayOutputStream {
    404         private int numBytesReceived = 0;
    405         @Override public void write(byte[] buffer, int offset, int len) {
    406             numBytesReceived += len;
    407             super.write(buffer, offset, Math.min(len, bodyLimit - count));
    408         }
    409         @Override public void write(int oneByte) {
    410             numBytesReceived++;
    411             if (count < bodyLimit) {
    412                 super.write(oneByte);
    413             }
    414         }
    415     }
    416 
    417     /**
    418      * Trigger the server to resume sending the download
    419      */
    420     public void doResumeDownload() {
    421         synchronized (downloadPauseLock) {
    422             downloadResume = true;
    423             downloadPauseLock.notifyAll();
    424         }
    425     }
    426 }
    427