Home | History | Annotate | Download | only in internal
      1 /*
      2  * Copyright 2016 The gRPC Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package io.grpc.internal;
     18 
     19 import static com.google.common.base.Preconditions.checkArgument;
     20 import static com.google.common.base.Preconditions.checkNotNull;
     21 import static com.google.common.base.Preconditions.checkState;
     22 import static io.grpc.ConnectivityState.IDLE;
     23 import static io.grpc.ConnectivityState.SHUTDOWN;
     24 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
     25 import static io.grpc.internal.ServiceConfigInterceptor.HEDGING_POLICY_KEY;
     26 import static io.grpc.internal.ServiceConfigInterceptor.RETRY_POLICY_KEY;
     27 
     28 import com.google.common.annotations.VisibleForTesting;
     29 import com.google.common.base.MoreObjects;
     30 import com.google.common.base.Stopwatch;
     31 import com.google.common.base.Supplier;
     32 import com.google.common.util.concurrent.ListenableFuture;
     33 import com.google.common.util.concurrent.SettableFuture;
     34 import io.grpc.Attributes;
     35 import io.grpc.CallOptions;
     36 import io.grpc.Channel;
     37 import io.grpc.ClientCall;
     38 import io.grpc.ClientInterceptor;
     39 import io.grpc.ClientInterceptors;
     40 import io.grpc.ClientStreamTracer;
     41 import io.grpc.CompressorRegistry;
     42 import io.grpc.ConnectivityState;
     43 import io.grpc.ConnectivityStateInfo;
     44 import io.grpc.Context;
     45 import io.grpc.DecompressorRegistry;
     46 import io.grpc.EquivalentAddressGroup;
     47 import io.grpc.InternalChannelz;
     48 import io.grpc.InternalChannelz.ChannelStats;
     49 import io.grpc.InternalChannelz.ChannelTrace;
     50 import io.grpc.InternalInstrumented;
     51 import io.grpc.InternalLogId;
     52 import io.grpc.InternalWithLogId;
     53 import io.grpc.LoadBalancer;
     54 import io.grpc.LoadBalancer.PickResult;
     55 import io.grpc.LoadBalancer.PickSubchannelArgs;
     56 import io.grpc.LoadBalancer.SubchannelPicker;
     57 import io.grpc.ManagedChannel;
     58 import io.grpc.Metadata;
     59 import io.grpc.MethodDescriptor;
     60 import io.grpc.NameResolver;
     61 import io.grpc.Status;
     62 import io.grpc.internal.ClientCallImpl.ClientTransportProvider;
     63 import io.grpc.internal.RetriableStream.ChannelBufferMeter;
     64 import io.grpc.internal.RetriableStream.Throttle;
     65 import java.net.URI;
     66 import java.net.URISyntaxException;
     67 import java.util.ArrayList;
     68 import java.util.Collection;
     69 import java.util.Collections;
     70 import java.util.HashSet;
     71 import java.util.List;
     72 import java.util.Map;
     73 import java.util.Set;
     74 import java.util.concurrent.CountDownLatch;
     75 import java.util.concurrent.Executor;
     76 import java.util.concurrent.ScheduledFuture;
     77 import java.util.concurrent.TimeUnit;
     78 import java.util.concurrent.atomic.AtomicBoolean;
     79 import java.util.logging.Level;
     80 import java.util.logging.Logger;
     81 import java.util.regex.Pattern;
     82 import javax.annotation.CheckForNull;
     83 import javax.annotation.Nullable;
     84 import javax.annotation.concurrent.GuardedBy;
     85 import javax.annotation.concurrent.ThreadSafe;
     86 
     87 /** A communication channel for making outgoing RPCs. */
     88 @ThreadSafe
     89 final class ManagedChannelImpl extends ManagedChannel implements
     90     InternalInstrumented<ChannelStats> {
     91   static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName());
     92 
     93   // Matching this pattern means the target string is a URI target or at least intended to be one.
     94   // A URI target must be an absolute hierarchical URI.
     95   // From RFC 2396: scheme = alpha *( alpha | digit | "+" | "-" | "." )
     96   @VisibleForTesting
     97   static final Pattern URI_PATTERN = Pattern.compile("[a-zA-Z][a-zA-Z0-9+.-]*:/.*");
     98 
     99   static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1;
    100 
    101   @VisibleForTesting
    102   static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5;
    103 
    104   @VisibleForTesting
    105   static final Status SHUTDOWN_NOW_STATUS =
    106       Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
    107 
    108   @VisibleForTesting
    109   static final Status SHUTDOWN_STATUS =
    110       Status.UNAVAILABLE.withDescription("Channel shutdown invoked");
    111 
    112   @VisibleForTesting
    113   static final Status SUBCHANNEL_SHUTDOWN_STATUS =
    114       Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked");
    115 
    116   private final InternalLogId logId = InternalLogId.allocate(getClass().getName());
    117   private final String target;
    118   private final NameResolver.Factory nameResolverFactory;
    119   private final Attributes nameResolverParams;
    120   private final LoadBalancer.Factory loadBalancerFactory;
    121   private final ClientTransportFactory transportFactory;
    122   private final Executor executor;
    123   private final ObjectPool<? extends Executor> executorPool;
    124   private final ObjectPool<? extends Executor> oobExecutorPool;
    125   private final TimeProvider timeProvider;
    126   private final int maxTraceEvents;
    127 
    128   private final ChannelExecutor channelExecutor = new ChannelExecutor() {
    129       @Override
    130       void handleUncaughtThrowable(Throwable t) {
    131         super.handleUncaughtThrowable(t);
    132         panic(t);
    133       }
    134     };
    135 
    136   private boolean fullStreamDecompression;
    137 
    138   private final DecompressorRegistry decompressorRegistry;
    139   private final CompressorRegistry compressorRegistry;
    140 
    141   private final Supplier<Stopwatch> stopwatchSupplier;
    142   /** The timout before entering idle mode. */
    143   private final long idleTimeoutMillis;
    144 
    145   private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager();
    146 
    147   private final ServiceConfigInterceptor serviceConfigInterceptor;
    148 
    149   private final BackoffPolicy.Provider backoffPolicyProvider;
    150 
    151   /**
    152    * We delegate to this channel, so that we can have interceptors as necessary. If there aren't
    153    * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a
    154    * {@link RealChannel}.
    155    */
    156   private final Channel interceptorChannel;
    157   @Nullable private final String userAgent;
    158 
    159   // Only null after channel is terminated. Must be assigned from the channelExecutor.
    160   private NameResolver nameResolver;
    161 
    162   // Must be accessed from the channelExecutor.
    163   private boolean nameResolverStarted;
    164 
    165   // null when channel is in idle mode.  Must be assigned from channelExecutor.
    166   @Nullable
    167   private LbHelperImpl lbHelper;
    168 
    169   // Must ONLY be assigned from updateSubchannelPicker(), which is called from channelExecutor.
    170   // null if channel is in idle mode.
    171   @Nullable
    172   private volatile SubchannelPicker subchannelPicker;
    173 
    174   // Must be accessed from the channelExecutor
    175   private boolean panicMode;
    176 
    177   // Must be mutated from channelExecutor
    178   // If any monitoring hook to be added later needs to get a snapshot of this Set, we could
    179   // switch to a ConcurrentHashMap.
    180   private final Set<InternalSubchannel> subchannels = new HashSet<InternalSubchannel>(16, .75f);
    181 
    182   // Must be mutated from channelExecutor
    183   private final Set<OobChannel> oobChannels = new HashSet<OobChannel>(1, .75f);
    184 
    185   // reprocess() must be run from channelExecutor
    186   private final DelayedClientTransport delayedTransport;
    187   private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry
    188       = new UncommittedRetriableStreamsRegistry();
    189 
    190   // Shutdown states.
    191   //
    192   // Channel's shutdown process:
    193   // 1. shutdown(): stop accepting new calls from applications
    194   //   1a shutdown <- true
    195   //   1b subchannelPicker <- null
    196   //   1c delayedTransport.shutdown()
    197   // 2. delayedTransport terminated: stop stream-creation functionality
    198   //   2a terminating <- true
    199   //   2b loadBalancer.shutdown()
    200   //     * LoadBalancer will shutdown subchannels and OOB channels
    201   //   2c loadBalancer <- null
    202   //   2d nameResolver.shutdown()
    203   //   2e nameResolver <- null
    204   // 3. All subchannels and OOB channels terminated: Channel considered terminated
    205 
    206   private final AtomicBoolean shutdown = new AtomicBoolean(false);
    207   // Must only be mutated and read from channelExecutor
    208   private boolean shutdownNowed;
    209   // Must be mutated from channelExecutor
    210   private volatile boolean terminating;
    211   // Must be mutated from channelExecutor
    212   private volatile boolean terminated;
    213   private final CountDownLatch terminatedLatch = new CountDownLatch(1);
    214 
    215   private final CallTracer.Factory callTracerFactory;
    216   private final CallTracer channelCallTracer;
    217   @CheckForNull
    218   private final ChannelTracer channelTracer;
    219   private final InternalChannelz channelz;
    220   @CheckForNull
    221   private Boolean haveBackends; // a flag for doing channel tracing when flipped
    222   @Nullable
    223   private Map<String, Object> lastServiceConfig; // used for channel tracing when value changed
    224 
    225   // One instance per channel.
    226   private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
    227 
    228   @Nullable
    229   private Throttle throttle;
    230 
    231   private final long perRpcBufferLimit;
    232   private final long channelBufferLimit;
    233 
    234   // Temporary false flag that can skip the retry code path.
    235   private final boolean retryEnabled;
    236 
    237   // Called from channelExecutor
    238   private final ManagedClientTransport.Listener delayedTransportListener =
    239       new ManagedClientTransport.Listener() {
    240         @Override
    241         public void transportShutdown(Status s) {
    242           checkState(shutdown.get(), "Channel must have been shut down");
    243         }
    244 
    245         @Override
    246         public void transportReady() {
    247           // Don't care
    248         }
    249 
    250         @Override
    251         public void transportInUse(final boolean inUse) {
    252           inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
    253         }
    254 
    255         @Override
    256         public void transportTerminated() {
    257           checkState(shutdown.get(), "Channel must have been shut down");
    258           terminating = true;
    259           shutdownNameResolverAndLoadBalancer(false);
    260           // No need to call channelStateManager since we are already in SHUTDOWN state.
    261           // Until LoadBalancer is shutdown, it may still create new subchannels.  We catch them
    262           // here.
    263           maybeShutdownNowSubchannels();
    264           maybeTerminateChannel();
    265         }
    266       };
    267 
    268   // Must be called from channelExecutor
    269   private void maybeShutdownNowSubchannels() {
    270     if (shutdownNowed) {
    271       for (InternalSubchannel subchannel : subchannels) {
    272         subchannel.shutdownNow(SHUTDOWN_NOW_STATUS);
    273       }
    274       for (OobChannel oobChannel : oobChannels) {
    275         oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS);
    276       }
    277     }
    278   }
    279 
    280   // Must be accessed from channelExecutor
    281   @VisibleForTesting
    282   final InUseStateAggregator<Object> inUseStateAggregator =
    283       new InUseStateAggregator<Object>() {
    284         @Override
    285         void handleInUse() {
    286           exitIdleMode();
    287         }
    288 
    289         @Override
    290         void handleNotInUse() {
    291           if (shutdown.get()) {
    292             return;
    293           }
    294           rescheduleIdleTimer();
    295         }
    296       };
    297 
    298   @Override
    299   public ListenableFuture<ChannelStats> getStats() {
    300     final SettableFuture<ChannelStats> ret = SettableFuture.create();
    301     // subchannels and oobchannels can only be accessed from channelExecutor
    302     channelExecutor.executeLater(new Runnable() {
    303       @Override
    304       public void run() {
    305         ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder();
    306         channelCallTracer.updateBuilder(builder);
    307         if (channelTracer != null) {
    308           channelTracer.updateBuilder(builder);
    309         }
    310         builder.setTarget(target).setState(channelStateManager.getState());
    311         List<InternalWithLogId> children = new ArrayList<>();
    312         children.addAll(subchannels);
    313         children.addAll(oobChannels);
    314         builder.setSubchannels(children);
    315         ret.set(builder.build());
    316       }
    317     }).drain();
    318     return ret;
    319   }
    320 
    321   @Override
    322   public InternalLogId getLogId() {
    323     return logId;
    324   }
    325 
    326   // Run from channelExecutor
    327   private class IdleModeTimer implements Runnable {
    328 
    329     @Override
    330     public void run() {
    331       enterIdleMode();
    332     }
    333   }
    334 
    335   // Must be called from channelExecutor
    336   private void shutdownNameResolverAndLoadBalancer(boolean verifyActive) {
    337     if (verifyActive) {
    338       checkState(nameResolver != null, "nameResolver is null");
    339       checkState(lbHelper != null, "lbHelper is null");
    340     }
    341     if (nameResolver != null) {
    342       cancelNameResolverBackoff();
    343       nameResolver.shutdown();
    344       nameResolver = null;
    345       nameResolverStarted = false;
    346     }
    347     if (lbHelper != null) {
    348       lbHelper.lb.shutdown();
    349       lbHelper = null;
    350     }
    351     subchannelPicker = null;
    352   }
    353 
    354   /**
    355    * Make the channel exit idle mode, if it's in it.
    356    *
    357    * <p>Must be called from channelExecutor
    358    */
    359   @VisibleForTesting
    360   void exitIdleMode() {
    361     if (shutdown.get() || panicMode) {
    362       return;
    363     }
    364     if (inUseStateAggregator.isInUse()) {
    365       // Cancel the timer now, so that a racing due timer will not put Channel on idleness
    366       // when the caller of exitIdleMode() is about to use the returned loadBalancer.
    367       cancelIdleTimer(false);
    368     } else {
    369       // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while
    370       // isInUse() == false, in which case we still need to schedule the timer.
    371       rescheduleIdleTimer();
    372     }
    373     if (lbHelper != null) {
    374       return;
    375     }
    376     logger.log(Level.FINE, "[{0}] Exiting idle mode", getLogId());
    377     lbHelper = new LbHelperImpl(nameResolver);
    378     lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
    379 
    380     NameResolverListenerImpl listener = new NameResolverListenerImpl(lbHelper);
    381     try {
    382       nameResolver.start(listener);
    383       nameResolverStarted = true;
    384     } catch (Throwable t) {
    385       listener.onError(Status.fromThrowable(t));
    386     }
    387   }
    388 
    389   // Must be run from channelExecutor
    390   private void enterIdleMode() {
    391     logger.log(Level.FINE, "[{0}] Entering idle mode", getLogId());
    392     // nameResolver and loadBalancer are guaranteed to be non-null.  If any of them were null,
    393     // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown()
    394     // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of
    395     // which are bugs.
    396     shutdownNameResolverAndLoadBalancer(true);
    397     delayedTransport.reprocess(null);
    398     nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams);
    399     if (channelTracer != null) {
    400       channelTracer.reportEvent(
    401           new ChannelTrace.Event.Builder()
    402               .setDescription("Entering IDLE state")
    403               .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
    404               .setTimestampNanos(timeProvider.currentTimeNanos())
    405               .build());
    406     }
    407     channelStateManager.gotoState(IDLE);
    408     if (inUseStateAggregator.isInUse()) {
    409       exitIdleMode();
    410     }
    411   }
    412 
    413   // Must be run from channelExecutor
    414   private void cancelIdleTimer(boolean permanent) {
    415     idleTimer.cancel(permanent);
    416   }
    417 
    418   // Always run from channelExecutor
    419   private void rescheduleIdleTimer() {
    420     if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
    421       return;
    422     }
    423     idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
    424   }
    425 
    426   // Run from channelExecutor
    427   @VisibleForTesting
    428   class NameResolverRefresh implements Runnable {
    429     // Only mutated from channelExecutor
    430     boolean cancelled;
    431 
    432     @Override
    433     public void run() {
    434       if (cancelled) {
    435         // Race detected: this task was scheduled on channelExecutor before
    436         // cancelNameResolverBackoff() could cancel the timer.
    437         return;
    438       }
    439       nameResolverRefreshFuture = null;
    440       nameResolverRefresh = null;
    441       if (nameResolver != null) {
    442         nameResolver.refresh();
    443       }
    444     }
    445   }
    446 
    447   // Must be used from channelExecutor
    448   @Nullable private ScheduledFuture<?> nameResolverRefreshFuture;
    449   // Must be used from channelExecutor
    450   @Nullable private NameResolverRefresh nameResolverRefresh;
    451   // The policy to control backoff between name resolution attempts. Non-null when an attempt is
    452   // scheduled. Must be used from channelExecutor
    453   @Nullable private BackoffPolicy nameResolverBackoffPolicy;
    454 
    455   // Must be run from channelExecutor
    456   private void cancelNameResolverBackoff() {
    457     if (nameResolverRefreshFuture != null) {
    458       nameResolverRefreshFuture.cancel(false);
    459       nameResolverRefresh.cancelled = true;
    460       nameResolverRefreshFuture = null;
    461       nameResolverRefresh = null;
    462       nameResolverBackoffPolicy = null;
    463     }
    464   }
    465 
    466   private final ClientTransportProvider transportProvider = new ClientTransportProvider() {
    467     @Override
    468     public ClientTransport get(PickSubchannelArgs args) {
    469       SubchannelPicker pickerCopy = subchannelPicker;
    470       if (shutdown.get()) {
    471         // If channel is shut down, delayedTransport is also shut down which will fail the stream
    472         // properly.
    473         return delayedTransport;
    474       }
    475       if (pickerCopy == null) {
    476         channelExecutor.executeLater(new Runnable() {
    477             @Override
    478             public void run() {
    479               exitIdleMode();
    480             }
    481           }).drain();
    482         return delayedTransport;
    483       }
    484       // There is no need to reschedule the idle timer here.
    485       //
    486       // pickerCopy != null, which means idle timer has not expired when this method starts.
    487       // Even if idle timer expires right after we grab pickerCopy, and it shuts down LoadBalancer
    488       // which calls Subchannel.shutdown(), the InternalSubchannel will be actually shutdown after
    489       // SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, which gives the caller time to start RPC on it.
    490       //
    491       // In most cases the idle timer is scheduled to fire after the transport has created the
    492       // stream, which would have reported in-use state to the channel that would have cancelled
    493       // the idle timer.
    494       PickResult pickResult = pickerCopy.pickSubchannel(args);
    495       ClientTransport transport = GrpcUtil.getTransportFromPickResult(
    496           pickResult, args.getCallOptions().isWaitForReady());
    497       if (transport != null) {
    498         return transport;
    499       }
    500       return delayedTransport;
    501     }
    502 
    503     @Override
    504     public <ReqT> RetriableStream<ReqT> newRetriableStream(
    505         final MethodDescriptor<ReqT, ?> method,
    506         final CallOptions callOptions,
    507         final Metadata headers,
    508         final Context context) {
    509       checkState(retryEnabled, "retry should be enabled");
    510       return new RetriableStream<ReqT>(
    511           method, headers, channelBufferUsed, perRpcBufferLimit, channelBufferLimit,
    512           getCallExecutor(callOptions), transportFactory.getScheduledExecutorService(),
    513           callOptions.getOption(RETRY_POLICY_KEY), callOptions.getOption(HEDGING_POLICY_KEY),
    514           throttle) {
    515         @Override
    516         Status prestart() {
    517           return uncommittedRetriableStreamsRegistry.add(this);
    518         }
    519 
    520         @Override
    521         void postCommit() {
    522           uncommittedRetriableStreamsRegistry.remove(this);
    523         }
    524 
    525         @Override
    526         ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata newHeaders) {
    527           CallOptions newOptions = callOptions.withStreamTracerFactory(tracerFactory);
    528           ClientTransport transport =
    529               get(new PickSubchannelArgsImpl(method, newHeaders, newOptions));
    530           Context origContext = context.attach();
    531           try {
    532             return transport.newStream(method, newHeaders, newOptions);
    533           } finally {
    534             context.detach(origContext);
    535           }
    536         }
    537       };
    538     }
    539   };
    540 
    541   private final Rescheduler idleTimer;
    542 
    543   ManagedChannelImpl(
    544       AbstractManagedChannelImplBuilder<?> builder,
    545       ClientTransportFactory clientTransportFactory,
    546       BackoffPolicy.Provider backoffPolicyProvider,
    547       ObjectPool<? extends Executor> oobExecutorPool,
    548       Supplier<Stopwatch> stopwatchSupplier,
    549       List<ClientInterceptor> interceptors,
    550       final TimeProvider timeProvider) {
    551     this.target = checkNotNull(builder.target, "target");
    552     this.nameResolverFactory = builder.getNameResolverFactory();
    553     this.nameResolverParams = checkNotNull(builder.getNameResolverParams(), "nameResolverParams");
    554     this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams);
    555     if (builder.loadBalancerFactory == null) {
    556       this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory();
    557     } else {
    558       this.loadBalancerFactory = builder.loadBalancerFactory;
    559     }
    560     this.executorPool = checkNotNull(builder.executorPool, "executorPool");
    561     this.oobExecutorPool = checkNotNull(oobExecutorPool, "oobExecutorPool");
    562     this.executor = checkNotNull(executorPool.getObject(), "executor");
    563     this.delayedTransport = new DelayedClientTransport(this.executor, this.channelExecutor);
    564     this.delayedTransport.start(delayedTransportListener);
    565     this.backoffPolicyProvider = backoffPolicyProvider;
    566     this.transportFactory =
    567         new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
    568     this.retryEnabled = builder.retryEnabled && !builder.temporarilyDisableRetry;
    569     serviceConfigInterceptor = new ServiceConfigInterceptor(
    570         retryEnabled, builder.maxRetryAttempts, builder.maxHedgedAttempts);
    571     Channel channel = new RealChannel();
    572     channel = ClientInterceptors.intercept(channel, serviceConfigInterceptor);
    573     if (builder.binlog != null) {
    574       channel = builder.binlog.wrapChannel(channel);
    575     }
    576     this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
    577     this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
    578     if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
    579       this.idleTimeoutMillis = builder.idleTimeoutMillis;
    580     } else {
    581       checkArgument(
    582           builder.idleTimeoutMillis
    583               >= AbstractManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
    584           "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
    585       this.idleTimeoutMillis = builder.idleTimeoutMillis;
    586     }
    587 
    588     final class AutoDrainChannelExecutor implements Executor {
    589 
    590       @Override
    591       public void execute(Runnable command) {
    592         channelExecutor.executeLater(command);
    593         channelExecutor.drain();
    594       }
    595     }
    596 
    597     idleTimer = new Rescheduler(
    598         new IdleModeTimer(),
    599         new AutoDrainChannelExecutor(),
    600         transportFactory.getScheduledExecutorService(),
    601         stopwatchSupplier.get());
    602     this.fullStreamDecompression = builder.fullStreamDecompression;
    603     this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
    604     this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
    605     this.userAgent = builder.userAgent;
    606 
    607     this.channelBufferLimit = builder.retryBufferSize;
    608     this.perRpcBufferLimit = builder.perRpcBufferLimit;
    609     this.timeProvider = checkNotNull(timeProvider, "timeProvider");
    610     this.callTracerFactory = new CallTracer.Factory() {
    611       @Override
    612       public CallTracer create() {
    613         return new CallTracer(timeProvider);
    614       }
    615     };
    616     channelCallTracer = callTracerFactory.create();
    617     this.channelz = checkNotNull(builder.channelz);
    618     channelz.addRootChannel(this);
    619 
    620     maxTraceEvents = builder.maxTraceEvents;
    621     if (maxTraceEvents > 0) {
    622       long currentTimeNanos = timeProvider.currentTimeNanos();
    623       channelTracer = new ChannelTracer(builder.maxTraceEvents, currentTimeNanos, "Channel");
    624     } else {
    625       channelTracer = null;
    626     }
    627     logger.log(Level.FINE, "[{0}] Created with target {1}", new Object[] {getLogId(), target});
    628   }
    629 
    630   @VisibleForTesting
    631   static NameResolver getNameResolver(String target, NameResolver.Factory nameResolverFactory,
    632       Attributes nameResolverParams) {
    633     // Finding a NameResolver. Try using the target string as the URI. If that fails, try prepending
    634     // "dns:///".
    635     URI targetUri = null;
    636     StringBuilder uriSyntaxErrors = new StringBuilder();
    637     try {
    638       targetUri = new URI(target);
    639       // For "localhost:8080" this would likely cause newNameResolver to return null, because
    640       // "localhost" is parsed as the scheme. Will fall into the next branch and try
    641       // "dns:///localhost:8080".
    642     } catch (URISyntaxException e) {
    643       // Can happen with ip addresses like "[::1]:1234" or 127.0.0.1:1234.
    644       uriSyntaxErrors.append(e.getMessage());
    645     }
    646     if (targetUri != null) {
    647       NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverParams);
    648       if (resolver != null) {
    649         return resolver;
    650       }
    651       // "foo.googleapis.com:8080" cause resolver to be null, because "foo.googleapis.com" is an
    652       // unmapped scheme. Just fall through and will try "dns:///foo.googleapis.com:8080"
    653     }
    654 
    655     // If we reached here, the targetUri couldn't be used.
    656     if (!URI_PATTERN.matcher(target).matches()) {
    657       // It doesn't look like a URI target. Maybe it's an authority string. Try with the default
    658       // scheme from the factory.
    659       try {
    660         targetUri = new URI(nameResolverFactory.getDefaultScheme(), "", "/" + target, null);
    661       } catch (URISyntaxException e) {
    662         // Should not be possible.
    663         throw new IllegalArgumentException(e);
    664       }
    665       NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverParams);
    666       if (resolver != null) {
    667         return resolver;
    668       }
    669     }
    670     throw new IllegalArgumentException(String.format(
    671         "cannot find a NameResolver for %s%s",
    672         target, uriSyntaxErrors.length() > 0 ? " (" + uriSyntaxErrors + ")" : ""));
    673   }
    674 
    675   /**
    676    * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
    677    * cancelled.
    678    */
    679   @Override
    680   public ManagedChannelImpl shutdown() {
    681     logger.log(Level.FINE, "[{0}] shutdown() called", getLogId());
    682     if (!shutdown.compareAndSet(false, true)) {
    683       return this;
    684     }
    685 
    686     // Put gotoState(SHUTDOWN) as early into the channelExecutor's queue as possible.
    687     // delayedTransport.shutdown() may also add some tasks into the queue. But some things inside
    688     // delayedTransport.shutdown() like setting delayedTransport.shutdown = true are not run in the
    689     // channelExecutor's queue and should not be blocked, so we do not drain() immediately here.
    690     channelExecutor.executeLater(new Runnable() {
    691       @Override
    692       public void run() {
    693         if (channelTracer != null) {
    694           channelTracer.reportEvent(new ChannelTrace.Event.Builder()
    695               .setDescription("Entering SHUTDOWN state")
    696               .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
    697               .setTimestampNanos(timeProvider.currentTimeNanos())
    698               .build());
    699         }
    700         channelStateManager.gotoState(SHUTDOWN);
    701       }
    702     });
    703 
    704     uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
    705     channelExecutor.executeLater(new Runnable() {
    706         @Override
    707         public void run() {
    708           cancelIdleTimer(/* permanent= */ true);
    709         }
    710       }).drain();
    711     logger.log(Level.FINE, "[{0}] Shutting down", getLogId());
    712     return this;
    713   }
    714 
    715   /**
    716    * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
    717    * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
    718    * return {@code false} immediately after this method returns.
    719    */
    720   @Override
    721   public ManagedChannelImpl shutdownNow() {
    722     logger.log(Level.FINE, "[{0}] shutdownNow() called", getLogId());
    723     shutdown();
    724     uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS);
    725     channelExecutor.executeLater(new Runnable() {
    726         @Override
    727         public void run() {
    728           if (shutdownNowed) {
    729             return;
    730           }
    731           shutdownNowed = true;
    732           maybeShutdownNowSubchannels();
    733         }
    734       }).drain();
    735     return this;
    736   }
    737 
    738   // Called from channelExecutor
    739   @VisibleForTesting
    740   void panic(final Throwable t) {
    741     if (panicMode) {
    742       // Preserve the first panic information
    743       return;
    744     }
    745     panicMode = true;
    746     cancelIdleTimer(/* permanent= */ true);
    747     shutdownNameResolverAndLoadBalancer(false);
    748     SubchannelPicker newPicker = new SubchannelPicker() {
    749       final PickResult panicPickResult =
    750           PickResult.withDrop(
    751               Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t));
    752       @Override
    753       public PickResult pickSubchannel(PickSubchannelArgs args) {
    754         return panicPickResult;
    755       }
    756     };
    757     updateSubchannelPicker(newPicker);
    758     if (channelTracer != null) {
    759       channelTracer.reportEvent(
    760           new ChannelTrace.Event.Builder()
    761               .setDescription("Entering TRANSIENT_FAILURE state")
    762               .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
    763               .setTimestampNanos(timeProvider.currentTimeNanos())
    764               .build());
    765     }
    766     channelStateManager.gotoState(TRANSIENT_FAILURE);
    767   }
    768 
    769   // Called from channelExecutor
    770   private void updateSubchannelPicker(SubchannelPicker newPicker) {
    771     subchannelPicker = newPicker;
    772     delayedTransport.reprocess(newPicker);
    773   }
    774 
    775   @Override
    776   public boolean isShutdown() {
    777     return shutdown.get();
    778   }
    779 
    780   @Override
    781   public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
    782     return terminatedLatch.await(timeout, unit);
    783   }
    784 
    785   @Override
    786   public boolean isTerminated() {
    787     return terminated;
    788   }
    789 
    790   /*
    791    * Creates a new outgoing call on the channel.
    792    */
    793   @Override
    794   public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
    795       CallOptions callOptions) {
    796     return interceptorChannel.newCall(method, callOptions);
    797   }
    798 
    799   @Override
    800   public String authority() {
    801     return interceptorChannel.authority();
    802   }
    803 
    804   private Executor getCallExecutor(CallOptions callOptions) {
    805     Executor executor = callOptions.getExecutor();
    806     if (executor == null) {
    807       executor = this.executor;
    808     }
    809     return executor;
    810   }
    811 
    812   private class RealChannel extends Channel {
    813     @Override
    814     public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
    815         CallOptions callOptions) {
    816       return new ClientCallImpl<ReqT, RespT>(
    817               method,
    818               getCallExecutor(callOptions),
    819               callOptions,
    820               transportProvider,
    821               terminated ? null : transportFactory.getScheduledExecutorService(),
    822               channelCallTracer,
    823               retryEnabled)
    824           .setFullStreamDecompression(fullStreamDecompression)
    825           .setDecompressorRegistry(decompressorRegistry)
    826           .setCompressorRegistry(compressorRegistry);
    827     }
    828 
    829     @Override
    830     public String authority() {
    831       String authority = nameResolver.getServiceAuthority();
    832       return checkNotNull(authority, "authority");
    833     }
    834   }
    835 
    836   /**
    837    * Terminate the channel if termination conditions are met.
    838    */
    839   // Must be run from channelExecutor
    840   private void maybeTerminateChannel() {
    841     if (terminated) {
    842       return;
    843     }
    844     if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) {
    845       logger.log(Level.FINE, "[{0}] Terminated", getLogId());
    846       channelz.removeRootChannel(this);
    847       terminated = true;
    848       terminatedLatch.countDown();
    849       executorPool.returnObject(executor);
    850       // Release the transport factory so that it can deallocate any resources.
    851       transportFactory.close();
    852     }
    853   }
    854 
    855   @Override
    856   public ConnectivityState getState(boolean requestConnection) {
    857     ConnectivityState savedChannelState = channelStateManager.getState();
    858     if (requestConnection && savedChannelState == IDLE) {
    859       channelExecutor.executeLater(
    860           new Runnable() {
    861             @Override
    862             public void run() {
    863               exitIdleMode();
    864               if (subchannelPicker != null) {
    865                 subchannelPicker.requestConnection();
    866               }
    867             }
    868           }).drain();
    869     }
    870     return savedChannelState;
    871   }
    872 
    873   @Override
    874   public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) {
    875     channelExecutor.executeLater(
    876         new Runnable() {
    877           @Override
    878           public void run() {
    879             channelStateManager.notifyWhenStateChanged(callback, executor, source);
    880           }
    881         }).drain();
    882   }
    883 
    884   @Override
    885   public void resetConnectBackoff() {
    886     channelExecutor
    887         .executeLater(
    888             new Runnable() {
    889               @Override
    890               public void run() {
    891                 if (shutdown.get()) {
    892                   return;
    893                 }
    894                 if (nameResolverRefreshFuture != null) {
    895                   checkState(nameResolverStarted, "name resolver must be started");
    896                   cancelNameResolverBackoff();
    897                   nameResolver.refresh();
    898                 }
    899                 for (InternalSubchannel subchannel : subchannels) {
    900                   subchannel.resetConnectBackoff();
    901                 }
    902                 for (OobChannel oobChannel : oobChannels) {
    903                   oobChannel.resetConnectBackoff();
    904                 }
    905               }
    906             })
    907         .drain();
    908   }
    909 
    910   @Override
    911   public void enterIdle() {
    912     class PrepareToLoseNetworkRunnable implements Runnable {
    913       @Override
    914       public void run() {
    915         if (shutdown.get() || lbHelper == null) {
    916           return;
    917         }
    918         cancelIdleTimer(/* permanent= */ false);
    919         enterIdleMode();
    920       }
    921     }
    922 
    923     channelExecutor.executeLater(new PrepareToLoseNetworkRunnable()).drain();
    924   }
    925 
    926   /**
    927    * A registry that prevents channel shutdown from killing existing retry attempts that are in
    928    * backoff.
    929    */
    930   private final class UncommittedRetriableStreamsRegistry {
    931     // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream,
    932     // it's worthwhile to look for a lock-free approach.
    933     final Object lock = new Object();
    934 
    935     @GuardedBy("lock")
    936     Collection<ClientStream> uncommittedRetriableStreams = new HashSet<ClientStream>();
    937 
    938     @GuardedBy("lock")
    939     Status shutdownStatus;
    940 
    941     void onShutdown(Status reason) {
    942       boolean shouldShutdownDelayedTransport = false;
    943       synchronized (lock) {
    944         if (shutdownStatus != null) {
    945           return;
    946         }
    947         shutdownStatus = reason;
    948         // Keep the delayedTransport open until there is no more uncommitted streams, b/c those
    949         // retriable streams, which may be in backoff and not using any transport, are already
    950         // started RPCs.
    951         if (uncommittedRetriableStreams.isEmpty()) {
    952           shouldShutdownDelayedTransport = true;
    953         }
    954       }
    955 
    956       if (shouldShutdownDelayedTransport) {
    957         delayedTransport.shutdown(reason);
    958       }
    959     }
    960 
    961     void onShutdownNow(Status reason) {
    962       onShutdown(reason);
    963       Collection<ClientStream> streams;
    964 
    965       synchronized (lock) {
    966         streams = new ArrayList<>(uncommittedRetriableStreams);
    967       }
    968 
    969       for (ClientStream stream : streams) {
    970         stream.cancel(reason);
    971       }
    972       delayedTransport.shutdownNow(reason);
    973     }
    974 
    975     /**
    976      * Registers a RetriableStream and return null if not shutdown, otherwise just returns the
    977      * shutdown Status.
    978      */
    979     @Nullable
    980     Status add(RetriableStream<?> retriableStream) {
    981       synchronized (lock) {
    982         if (shutdownStatus != null) {
    983           return shutdownStatus;
    984         }
    985         uncommittedRetriableStreams.add(retriableStream);
    986         return null;
    987       }
    988     }
    989 
    990     void remove(RetriableStream<?> retriableStream) {
    991       Status shutdownStatusCopy = null;
    992 
    993       synchronized (lock) {
    994         uncommittedRetriableStreams.remove(retriableStream);
    995         if (uncommittedRetriableStreams.isEmpty()) {
    996           shutdownStatusCopy = shutdownStatus;
    997           // Because retriable transport is long-lived, we take this opportunity to down-size the
    998           // hashmap.
    999           uncommittedRetriableStreams = new HashSet<ClientStream>();
   1000         }
   1001       }
   1002 
   1003       if (shutdownStatusCopy != null) {
   1004         delayedTransport.shutdown(shutdownStatusCopy);
   1005       }
   1006     }
   1007   }
   1008 
   1009   private class LbHelperImpl extends LoadBalancer.Helper {
   1010     LoadBalancer lb;
   1011     final NameResolver nr;
   1012 
   1013     LbHelperImpl(NameResolver nr) {
   1014       this.nr = checkNotNull(nr, "NameResolver");
   1015     }
   1016 
   1017     // Must be called from channelExecutor
   1018     private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
   1019       if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
   1020         nr.refresh();
   1021       }
   1022     }
   1023 
   1024     @Override
   1025     public AbstractSubchannel createSubchannel(
   1026         List<EquivalentAddressGroup> addressGroups, Attributes attrs) {
   1027       checkNotNull(addressGroups, "addressGroups");
   1028       checkNotNull(attrs, "attrs");
   1029       // TODO(ejona): can we be even stricter? Like loadBalancer == null?
   1030       checkState(!terminated, "Channel is terminated");
   1031       final SubchannelImpl subchannel = new SubchannelImpl(attrs);
   1032       ChannelTracer subchannelTracer = null;
   1033       long subchannelCreationTime = timeProvider.currentTimeNanos();
   1034       if (maxTraceEvents > 0) {
   1035         subchannelTracer = new ChannelTracer(maxTraceEvents, subchannelCreationTime, "Subchannel");
   1036       }
   1037       final InternalSubchannel internalSubchannel = new InternalSubchannel(
   1038           addressGroups,
   1039           authority(),
   1040           userAgent,
   1041           backoffPolicyProvider,
   1042           transportFactory,
   1043           transportFactory.getScheduledExecutorService(),
   1044           stopwatchSupplier,
   1045           channelExecutor,
   1046           new InternalSubchannel.Callback() {
   1047             // All callbacks are run in channelExecutor
   1048             @Override
   1049             void onTerminated(InternalSubchannel is) {
   1050               subchannels.remove(is);
   1051               channelz.removeSubchannel(is);
   1052               maybeTerminateChannel();
   1053             }
   1054 
   1055             @Override
   1056             void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
   1057               handleInternalSubchannelState(newState);
   1058               // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
   1059               if (LbHelperImpl.this == ManagedChannelImpl.this.lbHelper) {
   1060                 lb.handleSubchannelState(subchannel, newState);
   1061               }
   1062             }
   1063 
   1064             @Override
   1065             void onInUse(InternalSubchannel is) {
   1066               inUseStateAggregator.updateObjectInUse(is, true);
   1067             }
   1068 
   1069             @Override
   1070             void onNotInUse(InternalSubchannel is) {
   1071               inUseStateAggregator.updateObjectInUse(is, false);
   1072             }
   1073           },
   1074           channelz,
   1075           callTracerFactory.create(),
   1076           subchannelTracer,
   1077           timeProvider);
   1078       if (channelTracer != null) {
   1079         channelTracer.reportEvent(new ChannelTrace.Event.Builder()
   1080             .setDescription("Child channel created")
   1081             .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
   1082             .setTimestampNanos(subchannelCreationTime)
   1083             .setSubchannelRef(internalSubchannel)
   1084             .build());
   1085       }
   1086       channelz.addSubchannel(internalSubchannel);
   1087       subchannel.subchannel = internalSubchannel;
   1088       logger.log(Level.FINE, "[{0}] {1} created for {2}",
   1089           new Object[] {getLogId(), internalSubchannel.getLogId(), addressGroups});
   1090       runSerialized(new Runnable() {
   1091           @Override
   1092           public void run() {
   1093             if (terminating) {
   1094               // Because runSerialized() doesn't guarantee the runnable has been executed upon when
   1095               // returning, the subchannel may still be returned to the balancer without being
   1096               // shutdown even if "terminating" is already true.  The subchannel will not be used in
   1097               // this case, because delayed transport has terminated when "terminating" becomes
   1098               // true, and no more requests will be sent to balancer beyond this point.
   1099               internalSubchannel.shutdown(SHUTDOWN_STATUS);
   1100             }
   1101             if (!terminated) {
   1102               // If channel has not terminated, it will track the subchannel and block termination
   1103               // for it.
   1104               subchannels.add(internalSubchannel);
   1105             }
   1106           }
   1107         });
   1108       return subchannel;
   1109     }
   1110 
   1111     @Override
   1112     public void updateBalancingState(
   1113         final ConnectivityState newState, final SubchannelPicker newPicker) {
   1114       checkNotNull(newState, "newState");
   1115       checkNotNull(newPicker, "newPicker");
   1116 
   1117       runSerialized(
   1118           new Runnable() {
   1119             @Override
   1120             public void run() {
   1121               if (LbHelperImpl.this != lbHelper) {
   1122                 return;
   1123               }
   1124               updateSubchannelPicker(newPicker);
   1125               // It's not appropriate to report SHUTDOWN state from lb.
   1126               // Ignore the case of newState == SHUTDOWN for now.
   1127               if (newState != SHUTDOWN) {
   1128                 if (channelTracer != null) {
   1129                   channelTracer.reportEvent(
   1130                       new ChannelTrace.Event.Builder()
   1131                           .setDescription("Entering " + newState + " state")
   1132                           .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
   1133                           .setTimestampNanos(timeProvider.currentTimeNanos())
   1134                           .build());
   1135                 }
   1136                 channelStateManager.gotoState(newState);
   1137               }
   1138             }
   1139           });
   1140     }
   1141 
   1142     @Override
   1143     public void updateSubchannelAddresses(
   1144         LoadBalancer.Subchannel subchannel, List<EquivalentAddressGroup> addrs) {
   1145       checkArgument(subchannel instanceof SubchannelImpl,
   1146           "subchannel must have been returned from createSubchannel");
   1147       ((SubchannelImpl) subchannel).subchannel.updateAddresses(addrs);
   1148     }
   1149 
   1150     @Override
   1151     public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
   1152       // TODO(ejona): can we be even stricter? Like terminating?
   1153       checkState(!terminated, "Channel is terminated");
   1154       long oobChannelCreationTime = timeProvider.currentTimeNanos();
   1155       ChannelTracer oobChannelTracer = null;
   1156       ChannelTracer subchannelTracer = null;
   1157       if (channelTracer != null) {
   1158         oobChannelTracer = new ChannelTracer(maxTraceEvents, oobChannelCreationTime, "OobChannel");
   1159       }
   1160       final OobChannel oobChannel = new OobChannel(
   1161           authority, oobExecutorPool, transportFactory.getScheduledExecutorService(),
   1162           channelExecutor, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
   1163       if (channelTracer != null) {
   1164         channelTracer.reportEvent(new ChannelTrace.Event.Builder()
   1165             .setDescription("Child channel created")
   1166             .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
   1167             .setTimestampNanos(oobChannelCreationTime)
   1168             .setChannelRef(oobChannel)
   1169             .build());
   1170         subchannelTracer = new ChannelTracer(maxTraceEvents, oobChannelCreationTime, "Subchannel");
   1171       }
   1172       final InternalSubchannel internalSubchannel = new InternalSubchannel(
   1173           Collections.singletonList(addressGroup),
   1174           authority, userAgent, backoffPolicyProvider, transportFactory,
   1175           transportFactory.getScheduledExecutorService(), stopwatchSupplier, channelExecutor,
   1176           // All callback methods are run from channelExecutor
   1177           new InternalSubchannel.Callback() {
   1178             @Override
   1179             void onTerminated(InternalSubchannel is) {
   1180               oobChannels.remove(oobChannel);
   1181               channelz.removeSubchannel(is);
   1182               oobChannel.handleSubchannelTerminated();
   1183               maybeTerminateChannel();
   1184             }
   1185 
   1186             @Override
   1187             void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
   1188               handleInternalSubchannelState(newState);
   1189               oobChannel.handleSubchannelStateChange(newState);
   1190             }
   1191           },
   1192           channelz,
   1193           callTracerFactory.create(),
   1194           subchannelTracer,
   1195           timeProvider);
   1196       if (oobChannelTracer != null) {
   1197         oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
   1198             .setDescription("Child channel created")
   1199             .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
   1200             .setTimestampNanos(oobChannelCreationTime)
   1201             .setSubchannelRef(internalSubchannel)
   1202             .build());
   1203       }
   1204       channelz.addSubchannel(oobChannel);
   1205       channelz.addSubchannel(internalSubchannel);
   1206       oobChannel.setSubchannel(internalSubchannel);
   1207       runSerialized(new Runnable() {
   1208           @Override
   1209           public void run() {
   1210             if (terminating) {
   1211               oobChannel.shutdown();
   1212             }
   1213             if (!terminated) {
   1214               // If channel has not terminated, it will track the subchannel and block termination
   1215               // for it.
   1216               oobChannels.add(oobChannel);
   1217             }
   1218           }
   1219         });
   1220       return oobChannel;
   1221     }
   1222 
   1223     @Override
   1224     public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
   1225       checkArgument(channel instanceof OobChannel,
   1226           "channel must have been returned from createOobChannel");
   1227       ((OobChannel) channel).updateAddresses(eag);
   1228     }
   1229 
   1230     @Override
   1231     public String getAuthority() {
   1232       return ManagedChannelImpl.this.authority();
   1233     }
   1234 
   1235     @Override
   1236     public NameResolver.Factory getNameResolverFactory() {
   1237       return nameResolverFactory;
   1238     }
   1239 
   1240     @Override
   1241     public void runSerialized(Runnable task) {
   1242       channelExecutor.executeLater(task).drain();
   1243     }
   1244   }
   1245 
   1246   private class NameResolverListenerImpl implements NameResolver.Listener {
   1247     final LbHelperImpl helper;
   1248 
   1249     NameResolverListenerImpl(LbHelperImpl helperImpl) {
   1250       this.helper = helperImpl;
   1251     }
   1252 
   1253     @Override
   1254     public void onAddresses(final List<EquivalentAddressGroup> servers, final Attributes config) {
   1255       if (servers.isEmpty()) {
   1256         onError(Status.UNAVAILABLE.withDescription("NameResolver returned an empty list"));
   1257         return;
   1258       }
   1259       if (logger.isLoggable(Level.FINE)) {
   1260         logger.log(Level.FINE, "[{0}] resolved address: {1}, config={2}",
   1261             new Object[]{getLogId(), servers, config});
   1262       }
   1263 
   1264       if (channelTracer != null && (haveBackends == null || !haveBackends)) {
   1265         channelTracer.reportEvent(new ChannelTrace.Event.Builder()
   1266             .setDescription("Address resolved: " + servers)
   1267             .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
   1268             .setTimestampNanos(timeProvider.currentTimeNanos())
   1269             .build());
   1270         haveBackends = true;
   1271       }
   1272       final Map<String, Object> serviceConfig =
   1273           config.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
   1274       if (channelTracer != null && serviceConfig != null
   1275           && !serviceConfig.equals(lastServiceConfig)) {
   1276         channelTracer.reportEvent(new ChannelTrace.Event.Builder()
   1277             .setDescription("Service config changed")
   1278             .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
   1279             .setTimestampNanos(timeProvider.currentTimeNanos())
   1280             .build());
   1281         lastServiceConfig = serviceConfig;
   1282       }
   1283 
   1284       final class NamesResolved implements Runnable {
   1285         @Override
   1286         public void run() {
   1287           // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
   1288           if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) {
   1289             return;
   1290           }
   1291 
   1292           nameResolverBackoffPolicy = null;
   1293 
   1294           if (serviceConfig != null) {
   1295             try {
   1296               serviceConfigInterceptor.handleUpdate(serviceConfig);
   1297               if (retryEnabled) {
   1298                 throttle = getThrottle(config);
   1299               }
   1300             } catch (RuntimeException re) {
   1301               logger.log(
   1302                   Level.WARNING,
   1303                   "[" + getLogId() + "] Unexpected exception from parsing service config",
   1304                   re);
   1305             }
   1306           }
   1307 
   1308           helper.lb.handleResolvedAddressGroups(servers, config);
   1309         }
   1310       }
   1311 
   1312       helper.runSerialized(new NamesResolved());
   1313     }
   1314 
   1315     @Override
   1316     public void onError(final Status error) {
   1317       checkArgument(!error.isOk(), "the error status must not be OK");
   1318       logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}",
   1319           new Object[] {getLogId(), error});
   1320       if (channelTracer != null && (haveBackends == null || haveBackends)) {
   1321         channelTracer.reportEvent(new ChannelTrace.Event.Builder()
   1322             .setDescription("Failed to resolve name")
   1323             .setSeverity(ChannelTrace.Event.Severity.CT_WARNING)
   1324             .setTimestampNanos(timeProvider.currentTimeNanos())
   1325             .build());
   1326         haveBackends = false;
   1327       }
   1328       channelExecutor
   1329           .executeLater(
   1330               new Runnable() {
   1331                 @Override
   1332                 public void run() {
   1333                   // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
   1334                   if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) {
   1335                     return;
   1336                   }
   1337                   helper.lb.handleNameResolutionError(error);
   1338                   if (nameResolverRefreshFuture != null) {
   1339                     // The name resolver may invoke onError multiple times, but we only want to
   1340                     // schedule one backoff attempt
   1341                     // TODO(ericgribkoff) Update contract of NameResolver.Listener or decide if we
   1342                     // want to reset the backoff interval upon repeated onError() calls
   1343                     return;
   1344                   }
   1345                   if (nameResolverBackoffPolicy == null) {
   1346                     nameResolverBackoffPolicy = backoffPolicyProvider.get();
   1347                   }
   1348                   long delayNanos = nameResolverBackoffPolicy.nextBackoffNanos();
   1349                   if (logger.isLoggable(Level.FINE)) {
   1350                     logger.log(
   1351                         Level.FINE,
   1352                         "[{0}] Scheduling DNS resolution backoff for {1} ns",
   1353                         new Object[] {logId, delayNanos});
   1354                   }
   1355                   nameResolverRefresh = new NameResolverRefresh();
   1356                   nameResolverRefreshFuture =
   1357                       transportFactory
   1358                           .getScheduledExecutorService()
   1359                           .schedule(nameResolverRefresh, delayNanos, TimeUnit.NANOSECONDS);
   1360                 }
   1361               })
   1362           .drain();
   1363     }
   1364   }
   1365 
   1366   @Nullable
   1367   private static Throttle getThrottle(Attributes config) {
   1368     return ServiceConfigUtil.getThrottlePolicy(
   1369         config.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG));
   1370   }
   1371 
   1372   private final class SubchannelImpl extends AbstractSubchannel {
   1373     // Set right after SubchannelImpl is created.
   1374     InternalSubchannel subchannel;
   1375     final Object shutdownLock = new Object();
   1376     final Attributes attrs;
   1377 
   1378     @GuardedBy("shutdownLock")
   1379     boolean shutdownRequested;
   1380     @GuardedBy("shutdownLock")
   1381     ScheduledFuture<?> delayedShutdownTask;
   1382 
   1383     SubchannelImpl(Attributes attrs) {
   1384       this.attrs = checkNotNull(attrs, "attrs");
   1385     }
   1386 
   1387     @Override
   1388     ClientTransport obtainActiveTransport() {
   1389       return subchannel.obtainActiveTransport();
   1390     }
   1391 
   1392     @Override
   1393     InternalInstrumented<ChannelStats> getInternalSubchannel() {
   1394       return subchannel;
   1395     }
   1396 
   1397     @Override
   1398     public void shutdown() {
   1399       synchronized (shutdownLock) {
   1400         if (shutdownRequested) {
   1401           if (terminating && delayedShutdownTask != null) {
   1402             // shutdown() was previously called when terminating == false, thus a delayed shutdown()
   1403             // was scheduled.  Now since terminating == true, We should expedite the shutdown.
   1404             delayedShutdownTask.cancel(false);
   1405             delayedShutdownTask = null;
   1406             // Will fall through to the subchannel.shutdown() at the end.
   1407           } else {
   1408             return;
   1409           }
   1410         } else {
   1411           shutdownRequested = true;
   1412         }
   1413         // Add a delay to shutdown to deal with the race between 1) a transport being picked and
   1414         // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g.,
   1415         // because of address change, or because LoadBalancer is shutdown by Channel entering idle
   1416         // mode). If (2) wins, the app will see a spurious error. We work around this by delaying
   1417         // shutdown of Subchannel for a few seconds here.
   1418         //
   1419         // TODO(zhangkun83): consider a better approach
   1420         // (https://github.com/grpc/grpc-java/issues/2562).
   1421         if (!terminating) {
   1422           delayedShutdownTask = transportFactory.getScheduledExecutorService().schedule(
   1423               new LogExceptionRunnable(
   1424                   new Runnable() {
   1425                     @Override
   1426                     public void run() {
   1427                       subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
   1428                     }
   1429                   }), SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
   1430           return;
   1431         }
   1432       }
   1433       // When terminating == true, no more real streams will be created. It's safe and also
   1434       // desirable to shutdown timely.
   1435       subchannel.shutdown(SHUTDOWN_STATUS);
   1436     }
   1437 
   1438     @Override
   1439     public void requestConnection() {
   1440       subchannel.obtainActiveTransport();
   1441     }
   1442 
   1443     @Override
   1444     public List<EquivalentAddressGroup> getAllAddresses() {
   1445       return subchannel.getAddressGroups();
   1446     }
   1447 
   1448     @Override
   1449     public Attributes getAttributes() {
   1450       return attrs;
   1451     }
   1452 
   1453     @Override
   1454     public String toString() {
   1455       return subchannel.getLogId().toString();
   1456     }
   1457   }
   1458 
   1459   @Override
   1460   public String toString() {
   1461     return MoreObjects.toStringHelper(this)
   1462         .add("logId", logId.getId())
   1463         .add("target", target)
   1464         .toString();
   1465   }
   1466 }
   1467