Home | History | Annotate | Download | only in internal
      1 /*
      2  * Copyright 2018 The gRPC Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package io.grpc.internal;
     18 
     19 import static com.google.common.base.Preconditions.checkArgument;
     20 import static com.google.common.base.Preconditions.checkNotNull;
     21 import static com.google.common.base.Verify.verify;
     22 
     23 import com.google.common.annotations.VisibleForTesting;
     24 import com.google.common.base.MoreObjects;
     25 import com.google.common.base.Objects;
     26 import com.google.common.base.Strings;
     27 import io.grpc.CallOptions;
     28 import io.grpc.Channel;
     29 import io.grpc.ClientCall;
     30 import io.grpc.ClientInterceptor;
     31 import io.grpc.Deadline;
     32 import io.grpc.MethodDescriptor;
     33 import io.grpc.Status.Code;
     34 import java.util.Collections;
     35 import java.util.EnumSet;
     36 import java.util.HashMap;
     37 import java.util.List;
     38 import java.util.Map;
     39 import java.util.Set;
     40 import java.util.concurrent.TimeUnit;
     41 import java.util.concurrent.atomic.AtomicReference;
     42 import java.util.logging.Level;
     43 import java.util.logging.Logger;
     44 import javax.annotation.CheckForNull;
     45 import javax.annotation.Nonnull;
     46 
     47 /**
     48  * Modifies RPCs in in conformance with a Service Config.
     49  */
     50 final class ServiceConfigInterceptor implements ClientInterceptor {
     51 
     52   private static final Logger logger = Logger.getLogger(ServiceConfigInterceptor.class.getName());
     53 
     54   // Map from method name to MethodInfo
     55   @VisibleForTesting
     56   final AtomicReference<Map<String, MethodInfo>> serviceMethodMap
     57       = new AtomicReference<Map<String, MethodInfo>>();
     58   @VisibleForTesting
     59   final AtomicReference<Map<String, MethodInfo>> serviceMap
     60       = new AtomicReference<Map<String, MethodInfo>>();
     61 
     62   private final boolean retryEnabled;
     63   private final int maxRetryAttemptsLimit;
     64   private final int maxHedgedAttemptsLimit;
     65 
     66   // Setting this to true and observing this equal to true are run in different threads.
     67   private volatile boolean nameResolveComplete;
     68 
     69   ServiceConfigInterceptor(
     70       boolean retryEnabled, int maxRetryAttemptsLimit, int maxHedgedAttemptsLimit) {
     71     this.retryEnabled = retryEnabled;
     72     this.maxRetryAttemptsLimit = maxRetryAttemptsLimit;
     73     this.maxHedgedAttemptsLimit = maxHedgedAttemptsLimit;
     74   }
     75 
     76   void handleUpdate(@Nonnull Map<String, Object> serviceConfig) {
     77     Map<String, MethodInfo> newServiceMethodConfigs = new HashMap<String, MethodInfo>();
     78     Map<String, MethodInfo> newServiceConfigs = new HashMap<String, MethodInfo>();
     79 
     80     // Try and do as much validation here before we swap out the existing configuration.  In case
     81     // the input is invalid, we don't want to lose the existing configuration.
     82 
     83     List<Map<String, Object>> methodConfigs =
     84         ServiceConfigUtil.getMethodConfigFromServiceConfig(serviceConfig);
     85     if (methodConfigs == null) {
     86       logger.log(Level.FINE, "No method configs found, skipping");
     87       nameResolveComplete = true;
     88       return;
     89     }
     90 
     91     for (Map<String, Object> methodConfig : methodConfigs) {
     92       MethodInfo info = new MethodInfo(
     93           methodConfig, retryEnabled, maxRetryAttemptsLimit, maxHedgedAttemptsLimit);
     94 
     95       List<Map<String, Object>> nameList =
     96           ServiceConfigUtil.getNameListFromMethodConfig(methodConfig);
     97 
     98       checkArgument(
     99           nameList != null && !nameList.isEmpty(), "no names in method config %s", methodConfig);
    100       for (Map<String, Object> name : nameList) {
    101         String serviceName = ServiceConfigUtil.getServiceFromName(name);
    102         checkArgument(!Strings.isNullOrEmpty(serviceName), "missing service name");
    103         String methodName = ServiceConfigUtil.getMethodFromName(name);
    104         if (Strings.isNullOrEmpty(methodName)) {
    105           // Service scoped config
    106           checkArgument(
    107               !newServiceConfigs.containsKey(serviceName), "Duplicate service %s", serviceName);
    108           newServiceConfigs.put(serviceName, info);
    109         } else {
    110           // Method scoped config
    111           String fullMethodName = MethodDescriptor.generateFullMethodName(serviceName, methodName);
    112           checkArgument(
    113               !newServiceMethodConfigs.containsKey(fullMethodName),
    114               "Duplicate method name %s",
    115               fullMethodName);
    116           newServiceMethodConfigs.put(fullMethodName, info);
    117         }
    118       }
    119     }
    120 
    121     // Okay, service config is good, swap it.
    122     serviceMethodMap.set(Collections.unmodifiableMap(newServiceMethodConfigs));
    123     serviceMap.set(Collections.unmodifiableMap(newServiceConfigs));
    124     nameResolveComplete = true;
    125   }
    126 
    127   /**
    128    * Equivalent of MethodConfig from a ServiceConfig with restrictions from Channel setting.
    129    */
    130   static final class MethodInfo {
    131     final Long timeoutNanos;
    132     final Boolean waitForReady;
    133     final Integer maxInboundMessageSize;
    134     final Integer maxOutboundMessageSize;
    135     final RetryPolicy retryPolicy;
    136     final HedgingPolicy hedgingPolicy;
    137 
    138     /**
    139      * Constructor.
    140      *
    141      * @param retryEnabled when false, the argument maxRetryAttemptsLimit will have no effect.
    142      */
    143     MethodInfo(
    144         Map<String, Object> methodConfig, boolean retryEnabled, int maxRetryAttemptsLimit,
    145         int maxHedgedAttemptsLimit) {
    146       timeoutNanos = ServiceConfigUtil.getTimeoutFromMethodConfig(methodConfig);
    147       waitForReady = ServiceConfigUtil.getWaitForReadyFromMethodConfig(methodConfig);
    148       maxInboundMessageSize =
    149           ServiceConfigUtil.getMaxResponseMessageBytesFromMethodConfig(methodConfig);
    150       if (maxInboundMessageSize != null) {
    151         checkArgument(
    152             maxInboundMessageSize >= 0,
    153             "maxInboundMessageSize %s exceeds bounds", maxInboundMessageSize);
    154       }
    155       maxOutboundMessageSize =
    156           ServiceConfigUtil.getMaxRequestMessageBytesFromMethodConfig(methodConfig);
    157       if (maxOutboundMessageSize != null) {
    158         checkArgument(
    159             maxOutboundMessageSize >= 0,
    160             "maxOutboundMessageSize %s exceeds bounds", maxOutboundMessageSize);
    161       }
    162 
    163       Map<String, Object> retryPolicyMap =
    164           retryEnabled ? ServiceConfigUtil.getRetryPolicyFromMethodConfig(methodConfig) : null;
    165       retryPolicy = retryPolicyMap == null
    166           ? RetryPolicy.DEFAULT : retryPolicy(retryPolicyMap, maxRetryAttemptsLimit);
    167 
    168       Map<String, Object> hedgingPolicyMap =
    169           retryEnabled ? ServiceConfigUtil.getHedgingPolicyFromMethodConfig(methodConfig) : null;
    170       hedgingPolicy = hedgingPolicyMap == null
    171           ? HedgingPolicy.DEFAULT : hedgingPolicy(hedgingPolicyMap, maxHedgedAttemptsLimit);
    172     }
    173 
    174     @Override
    175     public int hashCode() {
    176       return Objects.hashCode(
    177           timeoutNanos, waitForReady, maxInboundMessageSize, maxOutboundMessageSize, retryPolicy);
    178     }
    179 
    180     @Override
    181     public boolean equals(Object other) {
    182       if (!(other instanceof MethodInfo)) {
    183         return false;
    184       }
    185       MethodInfo that = (MethodInfo) other;
    186       return Objects.equal(this.timeoutNanos, that.timeoutNanos)
    187           && Objects.equal(this.waitForReady, that.waitForReady)
    188           && Objects.equal(this.maxInboundMessageSize, that.maxInboundMessageSize)
    189           && Objects.equal(this.maxOutboundMessageSize, that.maxOutboundMessageSize)
    190           && Objects.equal(this.retryPolicy, that.retryPolicy);
    191     }
    192 
    193     @Override
    194     public String toString() {
    195       return MoreObjects.toStringHelper(this)
    196           .add("timeoutNanos", timeoutNanos)
    197           .add("waitForReady", waitForReady)
    198           .add("maxInboundMessageSize", maxInboundMessageSize)
    199           .add("maxOutboundMessageSize", maxOutboundMessageSize)
    200           .add("retryPolicy", retryPolicy)
    201           .toString();
    202     }
    203 
    204     @SuppressWarnings("BetaApi") // Verify is stabilized since Guava v24.0
    205     private static RetryPolicy retryPolicy(Map<String, Object> retryPolicy, int maxAttemptsLimit) {
    206       int maxAttempts = checkNotNull(
    207           ServiceConfigUtil.getMaxAttemptsFromRetryPolicy(retryPolicy),
    208           "maxAttempts cannot be empty");
    209       checkArgument(maxAttempts >= 2, "maxAttempts must be greater than 1: %s", maxAttempts);
    210       maxAttempts = Math.min(maxAttempts, maxAttemptsLimit);
    211 
    212       long initialBackoffNanos = checkNotNull(
    213           ServiceConfigUtil.getInitialBackoffNanosFromRetryPolicy(retryPolicy),
    214           "initialBackoff cannot be empty");
    215       checkArgument(
    216           initialBackoffNanos > 0,
    217           "initialBackoffNanos must be greater than 0: %s",
    218           initialBackoffNanos);
    219 
    220       long maxBackoffNanos = checkNotNull(
    221           ServiceConfigUtil.getMaxBackoffNanosFromRetryPolicy(retryPolicy),
    222           "maxBackoff cannot be empty");
    223       checkArgument(
    224           maxBackoffNanos > 0, "maxBackoff must be greater than 0: %s", maxBackoffNanos);
    225 
    226       double backoffMultiplier = checkNotNull(
    227           ServiceConfigUtil.getBackoffMultiplierFromRetryPolicy(retryPolicy),
    228           "backoffMultiplier cannot be empty");
    229       checkArgument(
    230           backoffMultiplier > 0,
    231           "backoffMultiplier must be greater than 0: %s",
    232           backoffMultiplier);
    233 
    234       List<String> rawCodes =
    235           ServiceConfigUtil.getRetryableStatusCodesFromRetryPolicy(retryPolicy);
    236       checkNotNull(rawCodes, "rawCodes must be present");
    237       checkArgument(!rawCodes.isEmpty(), "rawCodes can't be empty");
    238       EnumSet<Code> codes = EnumSet.noneOf(Code.class);
    239       // service config doesn't say if duplicates are allowed, so just accept them.
    240       for (String rawCode : rawCodes) {
    241         verify(!"OK".equals(rawCode), "rawCode can not be \"OK\"");
    242         codes.add(Code.valueOf(rawCode));
    243       }
    244       Set<Code> retryableStatusCodes = Collections.unmodifiableSet(codes);
    245 
    246       return new RetryPolicy(
    247           maxAttempts, initialBackoffNanos, maxBackoffNanos, backoffMultiplier,
    248           retryableStatusCodes);
    249     }
    250   }
    251 
    252   @SuppressWarnings("BetaApi") // Verify is stabilized since Guava v24.0
    253   private static HedgingPolicy hedgingPolicy(
    254       Map<String, Object> hedgingPolicy, int maxAttemptsLimit) {
    255     int maxAttempts = checkNotNull(
    256         ServiceConfigUtil.getMaxAttemptsFromHedgingPolicy(hedgingPolicy),
    257         "maxAttempts cannot be empty");
    258     checkArgument(maxAttempts >= 2, "maxAttempts must be greater than 1: %s", maxAttempts);
    259     maxAttempts = Math.min(maxAttempts, maxAttemptsLimit);
    260 
    261     long hedgingDelayNanos = checkNotNull(
    262         ServiceConfigUtil.getHedgingDelayNanosFromHedgingPolicy(hedgingPolicy),
    263         "hedgingDelay cannot be empty");
    264     checkArgument(
    265         hedgingDelayNanos >= 0, "hedgingDelay must not be negative: %s", hedgingDelayNanos);
    266 
    267     List<String> rawCodes =
    268         ServiceConfigUtil.getNonFatalStatusCodesFromHedgingPolicy(hedgingPolicy);
    269     checkNotNull(rawCodes, "rawCodes must be present");
    270     checkArgument(!rawCodes.isEmpty(), "rawCodes can't be empty");
    271     EnumSet<Code> codes = EnumSet.noneOf(Code.class);
    272     // service config doesn't say if duplicates are allowed, so just accept them.
    273     for (String rawCode : rawCodes) {
    274       verify(!"OK".equals(rawCode), "rawCode can not be \"OK\"");
    275       codes.add(Code.valueOf(rawCode));
    276     }
    277     Set<Code> nonFatalStatusCodes = Collections.unmodifiableSet(codes);
    278 
    279     return new HedgingPolicy(maxAttempts, hedgingDelayNanos, nonFatalStatusCodes);
    280   }
    281 
    282   static final CallOptions.Key<RetryPolicy.Provider> RETRY_POLICY_KEY =
    283       CallOptions.Key.create("internal-retry-policy");
    284   static final CallOptions.Key<HedgingPolicy.Provider> HEDGING_POLICY_KEY =
    285       CallOptions.Key.create("internal-hedging-policy");
    286 
    287   @SuppressWarnings("BetaApi") // Verify is stabilized since Guava v24.0
    288   @Override
    289   public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
    290       final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
    291     if (retryEnabled) {
    292       if (nameResolveComplete) {
    293         final RetryPolicy retryPolicy = getRetryPolicyFromConfig(method);
    294         final class ImmediateRetryPolicyProvider implements RetryPolicy.Provider {
    295           @Override
    296           public RetryPolicy get() {
    297             return retryPolicy;
    298           }
    299         }
    300 
    301         final HedgingPolicy hedgingPolicy = getHedgingPolicyFromConfig(method);
    302         final class ImmediateHedgingPolicyProvider implements HedgingPolicy.Provider {
    303           @Override
    304           public HedgingPolicy get() {
    305             return hedgingPolicy;
    306           }
    307         }
    308 
    309         verify(
    310             retryPolicy.equals(RetryPolicy.DEFAULT) || hedgingPolicy.equals(HedgingPolicy.DEFAULT),
    311             "Can not apply both retry and hedging policy for the method '%s'", method);
    312 
    313         callOptions = callOptions
    314             .withOption(RETRY_POLICY_KEY, new ImmediateRetryPolicyProvider())
    315             .withOption(HEDGING_POLICY_KEY, new ImmediateHedgingPolicyProvider());
    316       } else {
    317         final class DelayedRetryPolicyProvider implements RetryPolicy.Provider {
    318           /**
    319            * Returns RetryPolicy.DEFAULT if name resolving is not complete at the moment the method
    320            * is invoked, otherwise returns the RetryPolicy computed from service config.
    321            *
    322            * <p>Note that this method is used no more than once for each call.
    323            */
    324           @Override
    325           public RetryPolicy get() {
    326             if (!nameResolveComplete) {
    327               return RetryPolicy.DEFAULT;
    328             }
    329             return getRetryPolicyFromConfig(method);
    330           }
    331         }
    332 
    333         final class DelayedHedgingPolicyProvider implements HedgingPolicy.Provider {
    334           /**
    335            * Returns HedgingPolicy.DEFAULT if name resolving is not complete at the moment the
    336            * method is invoked, otherwise returns the HedgingPolicy computed from service config.
    337            *
    338            * <p>Note that this method is used no more than once for each call.
    339            */
    340           @Override
    341           public HedgingPolicy get() {
    342             if (!nameResolveComplete) {
    343               return HedgingPolicy.DEFAULT;
    344             }
    345             HedgingPolicy hedgingPolicy = getHedgingPolicyFromConfig(method);
    346             verify(
    347                 hedgingPolicy.equals(HedgingPolicy.DEFAULT)
    348                     || getRetryPolicyFromConfig(method).equals(RetryPolicy.DEFAULT),
    349                 "Can not apply both retry and hedging policy for the method '%s'", method);
    350             return hedgingPolicy;
    351           }
    352         }
    353 
    354         callOptions = callOptions
    355             .withOption(RETRY_POLICY_KEY, new DelayedRetryPolicyProvider())
    356             .withOption(HEDGING_POLICY_KEY, new DelayedHedgingPolicyProvider());
    357       }
    358     }
    359 
    360     MethodInfo info = getMethodInfo(method);
    361     if (info == null) {
    362       return next.newCall(method, callOptions);
    363     }
    364 
    365     if (info.timeoutNanos != null) {
    366       Deadline newDeadline = Deadline.after(info.timeoutNanos, TimeUnit.NANOSECONDS);
    367       Deadline existingDeadline = callOptions.getDeadline();
    368       // If the new deadline is sooner than the existing deadline, swap them.
    369       if (existingDeadline == null || newDeadline.compareTo(existingDeadline) < 0) {
    370         callOptions = callOptions.withDeadline(newDeadline);
    371       }
    372     }
    373     if (info.waitForReady != null) {
    374       callOptions =
    375           info.waitForReady ? callOptions.withWaitForReady() : callOptions.withoutWaitForReady();
    376     }
    377     if (info.maxInboundMessageSize != null) {
    378       Integer existingLimit = callOptions.getMaxInboundMessageSize();
    379       if (existingLimit != null) {
    380         callOptions = callOptions.withMaxInboundMessageSize(
    381             Math.min(existingLimit, info.maxInboundMessageSize));
    382       } else {
    383         callOptions = callOptions.withMaxInboundMessageSize(info.maxInboundMessageSize);
    384       }
    385     }
    386     if (info.maxOutboundMessageSize != null) {
    387       Integer existingLimit = callOptions.getMaxOutboundMessageSize();
    388       if (existingLimit != null) {
    389         callOptions = callOptions.withMaxOutboundMessageSize(
    390             Math.min(existingLimit, info.maxOutboundMessageSize));
    391       } else {
    392         callOptions = callOptions.withMaxOutboundMessageSize(info.maxOutboundMessageSize);
    393       }
    394     }
    395 
    396     return next.newCall(method, callOptions);
    397   }
    398 
    399   @CheckForNull
    400   private MethodInfo getMethodInfo(MethodDescriptor<?, ?> method) {
    401     Map<String, MethodInfo> localServiceMethodMap = serviceMethodMap.get();
    402     MethodInfo info = null;
    403     if (localServiceMethodMap != null) {
    404       info = localServiceMethodMap.get(method.getFullMethodName());
    405     }
    406     if (info == null) {
    407       Map<String, MethodInfo> localServiceMap = serviceMap.get();
    408       if (localServiceMap != null) {
    409         info = localServiceMap.get(
    410             MethodDescriptor.extractFullServiceName(method.getFullMethodName()));
    411       }
    412     }
    413     return info;
    414   }
    415 
    416   @VisibleForTesting
    417   RetryPolicy getRetryPolicyFromConfig(MethodDescriptor<?, ?> method) {
    418     MethodInfo info = getMethodInfo(method);
    419     return info == null ? RetryPolicy.DEFAULT : info.retryPolicy;
    420   }
    421 
    422   @VisibleForTesting
    423   HedgingPolicy getHedgingPolicyFromConfig(MethodDescriptor<?, ?> method) {
    424     MethodInfo info = getMethodInfo(method);
    425     return info == null ? HedgingPolicy.DEFAULT : info.hedgingPolicy;
    426   }
    427 }
    428