Home | History | Annotate | Download | only in services
      1 /*
      2  * Copyright 2017 The gRPC Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package io.grpc.services;
     18 
     19 import com.google.common.annotations.VisibleForTesting;
     20 import com.google.common.base.Preconditions;
     21 import io.grpc.BinaryLog;
     22 import io.grpc.CallOptions;
     23 import io.grpc.Channel;
     24 import io.grpc.ClientCall;
     25 import io.grpc.ClientInterceptor;
     26 import io.grpc.ClientInterceptors;
     27 import io.grpc.Internal;
     28 import io.grpc.InternalClientInterceptors;
     29 import io.grpc.InternalServerInterceptors;
     30 import io.grpc.ManagedChannel;
     31 import io.grpc.MethodDescriptor;
     32 import io.grpc.MethodDescriptor.Marshaller;
     33 import io.grpc.ServerCallHandler;
     34 import io.grpc.ServerInterceptor;
     35 import io.grpc.ServerMethodDefinition;
     36 import java.io.ByteArrayInputStream;
     37 import java.io.ByteArrayOutputStream;
     38 import java.io.IOException;
     39 import java.io.InputStream;
     40 import java.io.OutputStream;
     41 import javax.annotation.Nullable;
     42 
     43 // TODO(zpencer): rename class to AbstractBinaryLog
     44 @Internal
     45 public abstract class BinaryLogProvider extends BinaryLog {
     46   @VisibleForTesting
     47   public static final Marshaller<byte[]> BYTEARRAY_MARSHALLER = new ByteArrayMarshaller();
     48 
     49   private final ClientInterceptor binaryLogShim = new BinaryLogShim();
     50 
     51   /**
     52    * Wraps a channel to provide binary logging on {@link ClientCall}s as needed.
     53    */
     54   @Override
     55   public final Channel wrapChannel(Channel channel) {
     56     return ClientInterceptors.intercept(channel, binaryLogShim);
     57   }
     58 
     59   private static MethodDescriptor<byte[], byte[]> toByteBufferMethod(
     60       MethodDescriptor<?, ?> method) {
     61     return method.toBuilder(BYTEARRAY_MARSHALLER, BYTEARRAY_MARSHALLER).build();
     62   }
     63 
     64   /**
     65    * Wraps a {@link ServerMethodDefinition} such that it performs binary logging if needed.
     66    */
     67   @Override
     68   public final <ReqT, RespT> ServerMethodDefinition<?, ?> wrapMethodDefinition(
     69       ServerMethodDefinition<ReqT, RespT> oMethodDef) {
     70     ServerInterceptor binlogInterceptor =
     71         getServerInterceptor(oMethodDef.getMethodDescriptor().getFullMethodName());
     72     if (binlogInterceptor == null) {
     73       return oMethodDef;
     74     }
     75     MethodDescriptor<byte[], byte[]> binMethod =
     76         BinaryLogProvider.toByteBufferMethod(oMethodDef.getMethodDescriptor());
     77     ServerMethodDefinition<byte[], byte[]> binDef =
     78         InternalServerInterceptors.wrapMethod(oMethodDef, binMethod);
     79     ServerCallHandler<byte[], byte[]> binlogHandler =
     80         InternalServerInterceptors.interceptCallHandlerCreate(
     81             binlogInterceptor, binDef.getServerCallHandler());
     82     return ServerMethodDefinition.create(binMethod, binlogHandler);
     83   }
     84 
     85   /**
     86    * Returns a {@link ServerInterceptor} for binary logging. gRPC is free to cache the interceptor,
     87    * so the interceptor must be reusable across calls. At runtime, the request and response
     88    * marshallers are always {@code Marshaller<InputStream>}.
     89    * Returns {@code null} if this method is not binary logged.
     90    */
     91   // TODO(zpencer): ensure the interceptor properly handles retries and hedging
     92   @Nullable
     93   protected abstract ServerInterceptor getServerInterceptor(String fullMethodName);
     94 
     95   /**
     96    * Returns a {@link ClientInterceptor} for binary logging. gRPC is free to cache the interceptor,
     97    * so the interceptor must be reusable across calls. At runtime, the request and response
     98    * marshallers are always {@code Marshaller<InputStream>}.
     99    * Returns {@code null} if this method is not binary logged.
    100    */
    101   // TODO(zpencer): ensure the interceptor properly handles retries and hedging
    102   @Nullable
    103   protected abstract ClientInterceptor getClientInterceptor(
    104       String fullMethodName, CallOptions callOptions);
    105 
    106   @Override
    107   public void close() throws IOException {
    108     // default impl: noop
    109     // TODO(zpencer): make BinaryLogProvider provide a BinaryLog, and this method belongs there
    110   }
    111 
    112   // Creating a named class makes debugging easier
    113   private static final class ByteArrayMarshaller implements Marshaller<byte[]> {
    114     @Override
    115     public InputStream stream(byte[] value) {
    116       return new ByteArrayInputStream(value);
    117     }
    118 
    119     @Override
    120     public byte[] parse(InputStream stream) {
    121       try {
    122         return parseHelper(stream);
    123       } catch (IOException e) {
    124         throw new RuntimeException(e);
    125       }
    126     }
    127 
    128     private byte[] parseHelper(InputStream stream) throws IOException {
    129       try {
    130         return IoUtils.toByteArray(stream);
    131       } finally {
    132         stream.close();
    133       }
    134     }
    135   }
    136 
    137   /**
    138    * The pipeline of interceptors is hard coded when the {@link ManagedChannel} is created.
    139    * This shim interceptor should always be installed as a placeholder. When a call starts,
    140    * this interceptor checks with the {@link BinaryLogProvider} to see if logging should happen
    141    * for this particular {@link ClientCall}'s method.
    142    */
    143   private final class BinaryLogShim implements ClientInterceptor {
    144     @Override
    145     public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
    146         MethodDescriptor<ReqT, RespT> method,
    147         CallOptions callOptions,
    148         Channel next) {
    149       ClientInterceptor binlogInterceptor = getClientInterceptor(
    150           method.getFullMethodName(), callOptions);
    151       if (binlogInterceptor == null) {
    152         return next.newCall(method, callOptions);
    153       } else {
    154         return InternalClientInterceptors
    155             .wrapClientInterceptor(
    156                 binlogInterceptor,
    157                 BYTEARRAY_MARSHALLER,
    158                 BYTEARRAY_MARSHALLER)
    159             .interceptCall(method, callOptions, next);
    160       }
    161     }
    162   }
    163 
    164   // Copied from internal
    165   private static final class IoUtils {
    166     /** maximum buffer to be read is 16 KB. */
    167     private static final int MAX_BUFFER_LENGTH = 16384;
    168 
    169     /** Returns the byte array. */
    170     public static byte[] toByteArray(InputStream in) throws IOException {
    171       ByteArrayOutputStream out = new ByteArrayOutputStream();
    172       copy(in, out);
    173       return out.toByteArray();
    174     }
    175 
    176     /** Copies the data from input stream to output stream. */
    177     public static long copy(InputStream from, OutputStream to) throws IOException {
    178       // Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta)
    179       Preconditions.checkNotNull(from);
    180       Preconditions.checkNotNull(to);
    181       byte[] buf = new byte[MAX_BUFFER_LENGTH];
    182       long total = 0;
    183       while (true) {
    184         int r = from.read(buf);
    185         if (r == -1) {
    186           break;
    187         }
    188         to.write(buf, 0, r);
    189         total += r;
    190       }
    191       return total;
    192     }
    193   }
    194 }
    195