Home | History | Annotate | Download | only in benchmarks
      1 /*
      2  * Copyright (C) 2014 Square, Inc.
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 package com.squareup.okhttp.benchmarks;
     17 
     18 import com.squareup.okhttp.internal.SslContextBuilder;
     19 import com.squareup.okhttp.internal.Util;
     20 import io.netty.bootstrap.Bootstrap;
     21 import io.netty.buffer.ByteBuf;
     22 import io.netty.buffer.PooledByteBufAllocator;
     23 import io.netty.channel.Channel;
     24 import io.netty.channel.ChannelHandlerContext;
     25 import io.netty.channel.ChannelInitializer;
     26 import io.netty.channel.ChannelOption;
     27 import io.netty.channel.ChannelPipeline;
     28 import io.netty.channel.SimpleChannelInboundHandler;
     29 import io.netty.channel.nio.NioEventLoopGroup;
     30 import io.netty.channel.socket.SocketChannel;
     31 import io.netty.channel.socket.nio.NioSocketChannel;
     32 import io.netty.handler.codec.http.DefaultFullHttpRequest;
     33 import io.netty.handler.codec.http.HttpClientCodec;
     34 import io.netty.handler.codec.http.HttpContent;
     35 import io.netty.handler.codec.http.HttpContentDecompressor;
     36 import io.netty.handler.codec.http.HttpHeaders;
     37 import io.netty.handler.codec.http.HttpMethod;
     38 import io.netty.handler.codec.http.HttpObject;
     39 import io.netty.handler.codec.http.HttpRequest;
     40 import io.netty.handler.codec.http.HttpResponse;
     41 import io.netty.handler.codec.http.HttpVersion;
     42 import io.netty.handler.codec.http.LastHttpContent;
     43 import io.netty.handler.ssl.SslHandler;
     44 import java.net.URL;
     45 import java.util.ArrayDeque;
     46 import java.util.Deque;
     47 import java.util.concurrent.TimeUnit;
     48 import javax.net.ssl.SSLContext;
     49 import javax.net.ssl.SSLEngine;
     50 
     51 /** Netty isn't an HTTP client, but it's almost one. */
     52 class NettyHttpClient implements HttpClient {
     53   private static final boolean VERBOSE = false;
     54 
     55   // Guarded by this. Real apps need more capable connection management.
     56   private final Deque<HttpChannel> freeChannels = new ArrayDeque<HttpChannel>();
     57   private final Deque<URL> backlog = new ArrayDeque<URL>();
     58 
     59   private int totalChannels = 0;
     60   private int concurrencyLevel;
     61   private int targetBacklog;
     62   private Bootstrap bootstrap;
     63 
     64   @Override public void prepare(final Benchmark benchmark) {
     65     this.concurrencyLevel = benchmark.concurrencyLevel;
     66     this.targetBacklog = benchmark.targetBacklog;
     67 
     68     ChannelInitializer<SocketChannel> channelInitializer = new ChannelInitializer<SocketChannel>() {
     69       @Override public void initChannel(SocketChannel channel) throws Exception {
     70         ChannelPipeline pipeline = channel.pipeline();
     71 
     72         if (benchmark.tls) {
     73           SSLContext sslContext = SslContextBuilder.localhost();
     74           SSLEngine engine = sslContext.createSSLEngine();
     75           engine.setUseClientMode(true);
     76           pipeline.addLast("ssl", new SslHandler(engine));
     77         }
     78 
     79         pipeline.addLast("codec", new HttpClientCodec());
     80         pipeline.addLast("inflater", new HttpContentDecompressor());
     81         pipeline.addLast("handler", new HttpChannel(channel));
     82       }
     83     };
     84 
     85     bootstrap = new Bootstrap();
     86     bootstrap.group(new NioEventLoopGroup(concurrencyLevel))
     87         .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
     88         .channel(NioSocketChannel.class)
     89         .handler(channelInitializer);
     90   }
     91 
     92   @Override public void enqueue(URL url) throws Exception {
     93     HttpChannel httpChannel = null;
     94     synchronized (this) {
     95       if (!freeChannels.isEmpty()) {
     96         httpChannel = freeChannels.pop();
     97       } else if (totalChannels < concurrencyLevel) {
     98         totalChannels++; // Create a new channel. (outside of the synchronized block).
     99       } else {
    100         backlog.add(url); // Enqueue this for later, to be picked up when another request completes.
    101         return;
    102       }
    103     }
    104     if (httpChannel == null) {
    105       Channel channel = bootstrap.connect(url.getHost(), Util.getEffectivePort(url))
    106           .sync().channel();
    107       httpChannel = (HttpChannel) channel.pipeline().last();
    108     }
    109     httpChannel.sendRequest(url);
    110   }
    111 
    112   @Override public synchronized boolean acceptingJobs() {
    113     return backlog.size() < targetBacklog || hasFreeChannels();
    114   }
    115 
    116   private boolean hasFreeChannels() {
    117     int activeChannels = totalChannels - freeChannels.size();
    118     return activeChannels < concurrencyLevel;
    119   }
    120 
    121   private void release(HttpChannel httpChannel) {
    122     URL url;
    123     synchronized (this) {
    124       url = backlog.pop();
    125       if (url == null) {
    126         // There were no URLs in the backlog. Pool this channel for later.
    127         freeChannels.push(httpChannel);
    128         return;
    129       }
    130     }
    131 
    132     // We removed a URL from the backlog. Schedule it right away.
    133     httpChannel.sendRequest(url);
    134   }
    135 
    136   class HttpChannel extends SimpleChannelInboundHandler<HttpObject> {
    137     private final SocketChannel channel;
    138     byte[] buffer = new byte[1024];
    139     int total;
    140     long start;
    141 
    142     public HttpChannel(SocketChannel channel) {
    143       this.channel = channel;
    144     }
    145 
    146     private void sendRequest(URL url) {
    147       start = System.nanoTime();
    148       total = 0;
    149       HttpRequest request = new DefaultFullHttpRequest(
    150           HttpVersion.HTTP_1_1, HttpMethod.GET, url.getPath());
    151       request.headers().set(HttpHeaders.Names.HOST, url.getHost());
    152       request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
    153       channel.writeAndFlush(request);
    154     }
    155 
    156     @Override protected void channelRead0(
    157         ChannelHandlerContext context, HttpObject message) throws Exception {
    158       if (message instanceof HttpResponse) {
    159         receive((HttpResponse) message);
    160       }
    161       if (message instanceof HttpContent) {
    162         receive((HttpContent) message);
    163         if (message instanceof LastHttpContent) {
    164           release(this);
    165         }
    166       }
    167     }
    168 
    169     @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    170       super.channelInactive(ctx);
    171     }
    172 
    173     void receive(HttpResponse response) {
    174       // Don't do anything with headers.
    175     }
    176 
    177     void receive(HttpContent content) {
    178       // Consume the response body.
    179       ByteBuf byteBuf = content.content();
    180       for (int toRead; (toRead = byteBuf.readableBytes()) > 0; ) {
    181         byteBuf.readBytes(buffer, 0, Math.min(buffer.length, toRead));
    182         total += toRead;
    183       }
    184 
    185       if (VERBOSE && content instanceof LastHttpContent) {
    186         long finish = System.nanoTime();
    187         System.out.println(String.format("Transferred % 8d bytes in %4d ms",
    188             total, TimeUnit.NANOSECONDS.toMillis(finish - start)));
    189       }
    190     }
    191 
    192     @Override public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
    193       System.out.println("Failed: " + cause);
    194     }
    195   }
    196 }
    197