Home | History | Annotate | Download | only in internal
      1 /*
      2  * Copyright 2015 The gRPC Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package io.grpc.internal;
     18 
     19 import static com.google.common.base.Preconditions.checkState;
     20 import static com.google.common.truth.Truth.assertThat;
     21 import static io.grpc.ConnectivityState.CONNECTING;
     22 import static io.grpc.ConnectivityState.IDLE;
     23 import static io.grpc.ConnectivityState.READY;
     24 import static io.grpc.ConnectivityState.SHUTDOWN;
     25 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
     26 import static junit.framework.TestCase.assertNotSame;
     27 import static org.junit.Assert.assertEquals;
     28 import static org.junit.Assert.assertFalse;
     29 import static org.junit.Assert.assertNotEquals;
     30 import static org.junit.Assert.assertNotNull;
     31 import static org.junit.Assert.assertNull;
     32 import static org.junit.Assert.assertSame;
     33 import static org.junit.Assert.assertTrue;
     34 import static org.mockito.Matchers.any;
     35 import static org.mockito.Matchers.anyObject;
     36 import static org.mockito.Matchers.eq;
     37 import static org.mockito.Matchers.same;
     38 import static org.mockito.Mockito.atLeast;
     39 import static org.mockito.Mockito.doAnswer;
     40 import static org.mockito.Mockito.doThrow;
     41 import static org.mockito.Mockito.inOrder;
     42 import static org.mockito.Mockito.mock;
     43 import static org.mockito.Mockito.never;
     44 import static org.mockito.Mockito.times;
     45 import static org.mockito.Mockito.verify;
     46 import static org.mockito.Mockito.verifyNoMoreInteractions;
     47 import static org.mockito.Mockito.verifyZeroInteractions;
     48 import static org.mockito.Mockito.when;
     49 
     50 import com.google.common.base.Throwables;
     51 import com.google.common.collect.ImmutableList;
     52 import com.google.common.collect.ImmutableMap;
     53 import com.google.common.collect.Iterables;
     54 import com.google.common.util.concurrent.ListenableFuture;
     55 import com.google.common.util.concurrent.MoreExecutors;
     56 import com.google.common.util.concurrent.SettableFuture;
     57 import io.grpc.Attributes;
     58 import io.grpc.BinaryLog;
     59 import io.grpc.CallCredentials;
     60 import io.grpc.CallCredentials.MetadataApplier;
     61 import io.grpc.CallOptions;
     62 import io.grpc.Channel;
     63 import io.grpc.ClientCall;
     64 import io.grpc.ClientInterceptor;
     65 import io.grpc.ClientInterceptors;
     66 import io.grpc.ClientStreamTracer;
     67 import io.grpc.ConnectivityState;
     68 import io.grpc.ConnectivityStateInfo;
     69 import io.grpc.Context;
     70 import io.grpc.EquivalentAddressGroup;
     71 import io.grpc.IntegerMarshaller;
     72 import io.grpc.InternalChannelz;
     73 import io.grpc.InternalChannelz.ChannelStats;
     74 import io.grpc.InternalChannelz.ChannelTrace;
     75 import io.grpc.InternalInstrumented;
     76 import io.grpc.LoadBalancer;
     77 import io.grpc.LoadBalancer.Helper;
     78 import io.grpc.LoadBalancer.PickResult;
     79 import io.grpc.LoadBalancer.PickSubchannelArgs;
     80 import io.grpc.LoadBalancer.Subchannel;
     81 import io.grpc.LoadBalancer.SubchannelPicker;
     82 import io.grpc.ManagedChannel;
     83 import io.grpc.Metadata;
     84 import io.grpc.MethodDescriptor;
     85 import io.grpc.MethodDescriptor.MethodType;
     86 import io.grpc.NameResolver;
     87 import io.grpc.SecurityLevel;
     88 import io.grpc.ServerMethodDefinition;
     89 import io.grpc.Status;
     90 import io.grpc.StringMarshaller;
     91 import io.grpc.internal.ClientTransportFactory.ClientTransportOptions;
     92 import io.grpc.internal.TestUtils.MockClientTransportInfo;
     93 import io.grpc.stub.ClientCalls;
     94 import io.grpc.testing.TestMethodDescriptors;
     95 import java.io.IOException;
     96 import java.net.SocketAddress;
     97 import java.net.URI;
     98 import java.util.ArrayList;
     99 import java.util.Arrays;
    100 import java.util.Collections;
    101 import java.util.HashMap;
    102 import java.util.LinkedList;
    103 import java.util.List;
    104 import java.util.Map;
    105 import java.util.Random;
    106 import java.util.concurrent.BlockingQueue;
    107 import java.util.concurrent.ExecutionException;
    108 import java.util.concurrent.Executor;
    109 import java.util.concurrent.TimeUnit;
    110 import java.util.concurrent.atomic.AtomicBoolean;
    111 import java.util.concurrent.atomic.AtomicLong;
    112 import java.util.concurrent.atomic.AtomicReference;
    113 import javax.annotation.Nullable;
    114 import org.junit.After;
    115 import org.junit.Assert;
    116 import org.junit.Assume;
    117 import org.junit.Before;
    118 import org.junit.Rule;
    119 import org.junit.Test;
    120 import org.junit.rules.ExpectedException;
    121 import org.junit.runner.RunWith;
    122 import org.junit.runners.JUnit4;
    123 import org.mockito.ArgumentCaptor;
    124 import org.mockito.Captor;
    125 import org.mockito.InOrder;
    126 import org.mockito.Matchers;
    127 import org.mockito.Mock;
    128 import org.mockito.MockitoAnnotations;
    129 import org.mockito.invocation.InvocationOnMock;
    130 import org.mockito.stubbing.Answer;
    131 
    132 /** Unit tests for {@link ManagedChannelImpl}. */
    133 @RunWith(JUnit4.class)
    134 public class ManagedChannelImplTest {
    135   private static final Attributes NAME_RESOLVER_PARAMS =
    136       Attributes.newBuilder().set(NameResolver.Factory.PARAMS_DEFAULT_PORT, 447).build();
    137 
    138   private static final MethodDescriptor<String, Integer> method =
    139       MethodDescriptor.<String, Integer>newBuilder()
    140           .setType(MethodType.UNKNOWN)
    141           .setFullMethodName("service/method")
    142           .setRequestMarshaller(new StringMarshaller())
    143           .setResponseMarshaller(new IntegerMarshaller())
    144           .build();
    145   private static final Attributes.Key<String> SUBCHANNEL_ATTR_KEY =
    146       Attributes.Key.create("subchannel-attr-key");
    147   private static final long RECONNECT_BACKOFF_INTERVAL_NANOS = 10;
    148   private static final String SERVICE_NAME = "fake.example.com";
    149   private static final String AUTHORITY = SERVICE_NAME;
    150   private static final String USER_AGENT = "userAgent";
    151   private static final ClientTransportOptions clientTransportOptions =
    152       new ClientTransportOptions()
    153           .setAuthority(AUTHORITY)
    154           .setUserAgent(USER_AGENT);
    155   private static final String TARGET = "fake://" + SERVICE_NAME;
    156   private URI expectedUri;
    157   private final SocketAddress socketAddress = new SocketAddress() {};
    158   private final EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress);
    159   private final FakeClock timer = new FakeClock();
    160   private final FakeClock executor = new FakeClock();
    161   private final FakeClock oobExecutor = new FakeClock();
    162   private static final FakeClock.TaskFilter NAME_RESOLVER_REFRESH_TASK_FILTER =
    163       new FakeClock.TaskFilter() {
    164         @Override
    165         public boolean shouldAccept(Runnable command) {
    166           return command instanceof ManagedChannelImpl.NameResolverRefresh;
    167         }
    168       };
    169 
    170   private final InternalChannelz channelz = new InternalChannelz();
    171 
    172   @Rule public final ExpectedException thrown = ExpectedException.none();
    173 
    174   private ManagedChannelImpl channel;
    175   private Helper helper;
    176   @Captor
    177   private ArgumentCaptor<Status> statusCaptor;
    178   @Captor
    179   private ArgumentCaptor<CallOptions> callOptionsCaptor;
    180   @Mock
    181   private LoadBalancer.Factory mockLoadBalancerFactory;
    182   @Mock
    183   private LoadBalancer mockLoadBalancer;
    184 
    185   @Captor
    186   private ArgumentCaptor<ConnectivityStateInfo> stateInfoCaptor;
    187   @Mock
    188   private SubchannelPicker mockPicker;
    189   @Mock
    190   private ClientTransportFactory mockTransportFactory;
    191   @Mock
    192   private ClientCall.Listener<Integer> mockCallListener;
    193   @Mock
    194   private ClientCall.Listener<Integer> mockCallListener2;
    195   @Mock
    196   private ClientCall.Listener<Integer> mockCallListener3;
    197   @Mock
    198   private ClientCall.Listener<Integer> mockCallListener4;
    199   @Mock
    200   private ClientCall.Listener<Integer> mockCallListener5;
    201   @Mock
    202   private ObjectPool<Executor> executorPool;
    203   @Mock
    204   private ObjectPool<Executor> oobExecutorPool;
    205   @Mock
    206   private CallCredentials creds;
    207   private ChannelBuilder channelBuilder;
    208   private boolean requestConnection = true;
    209   private BlockingQueue<MockClientTransportInfo> transports;
    210 
    211   private ArgumentCaptor<ClientStreamListener> streamListenerCaptor =
    212       ArgumentCaptor.forClass(ClientStreamListener.class);
    213 
    214   private void createChannel(ClientInterceptor... interceptors) {
    215     checkState(channel == null);
    216     TimeProvider fakeClockTimeProvider = new TimeProvider() {
    217       @Override
    218       public long currentTimeNanos() {
    219         return timer.getTicker().read();
    220       }
    221     };
    222 
    223     channel = new ManagedChannelImpl(
    224         channelBuilder, mockTransportFactory, new FakeBackoffPolicyProvider(),
    225         oobExecutorPool, timer.getStopwatchSupplier(), Arrays.asList(interceptors),
    226         fakeClockTimeProvider);
    227 
    228     if (requestConnection) {
    229       int numExpectedTasks = 0;
    230 
    231       // Force-exit the initial idle-mode
    232       channel.exitIdleMode();
    233       if (channelBuilder.idleTimeoutMillis != ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE) {
    234         numExpectedTasks += 1;
    235       }
    236 
    237       if (getNameResolverRefresh() != null) {
    238         numExpectedTasks += 1;
    239       }
    240 
    241       assertEquals(numExpectedTasks, timer.numPendingTasks());
    242 
    243       ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
    244       verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture());
    245       helper = helperCaptor.getValue();
    246     }
    247   }
    248 
    249   @Before
    250   public void setUp() throws Exception {
    251     MockitoAnnotations.initMocks(this);
    252     expectedUri = new URI(TARGET);
    253     when(mockLoadBalancerFactory.newLoadBalancer(any(Helper.class))).thenReturn(mockLoadBalancer);
    254     transports = TestUtils.captureTransports(mockTransportFactory);
    255     when(mockTransportFactory.getScheduledExecutorService())
    256         .thenReturn(timer.getScheduledExecutorService());
    257     when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService());
    258     when(oobExecutorPool.getObject()).thenReturn(oobExecutor.getScheduledExecutorService());
    259 
    260     channelBuilder = new ChannelBuilder()
    261         .nameResolverFactory(new FakeNameResolverFactory.Builder(expectedUri).build())
    262         .loadBalancerFactory(mockLoadBalancerFactory)
    263         .userAgent(USER_AGENT)
    264         .idleTimeout(AbstractManagedChannelImplBuilder.IDLE_MODE_MAX_TIMEOUT_DAYS, TimeUnit.DAYS);
    265     channelBuilder.executorPool = executorPool;
    266     channelBuilder.binlog = null;
    267     channelBuilder.channelz = channelz;
    268   }
    269 
    270   @After
    271   public void allPendingTasksAreRun() throws Exception {
    272     // The "never" verifications in the tests only hold up if all due tasks are done.
    273     // As for timer, although there may be scheduled tasks in a future time, since we don't test
    274     // any time-related behavior in this test suite, we only care the tasks that are due. This
    275     // would ignore any time-sensitive tasks, e.g., back-off and the idle timer.
    276     assertTrue(timer.getDueTasks() + " should be empty", timer.getDueTasks().isEmpty());
    277     assertEquals(executor.getPendingTasks() + " should be empty", 0, executor.numPendingTasks());
    278     if (channel != null) {
    279       channel.shutdownNow();
    280       channel = null;
    281     }
    282   }
    283 
    284   @Test
    285   @SuppressWarnings("unchecked")
    286   public void idleModeDisabled() {
    287     channelBuilder.nameResolverFactory(
    288         new FakeNameResolverFactory.Builder(expectedUri)
    289             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
    290             .build());
    291     createChannel();
    292 
    293     // In this test suite, the channel is always created with idle mode disabled.
    294     // No task is scheduled to enter idle mode
    295     assertEquals(0, timer.numPendingTasks());
    296     assertEquals(0, executor.numPendingTasks());
    297   }
    298 
    299   @Test
    300   public void immediateDeadlineExceeded() {
    301     createChannel();
    302     ClientCall<String, Integer> call =
    303         channel.newCall(method, CallOptions.DEFAULT.withDeadlineAfter(0, TimeUnit.NANOSECONDS));
    304     call.start(mockCallListener, new Metadata());
    305     assertEquals(1, executor.runDueTasks());
    306 
    307     verify(mockCallListener).onClose(statusCaptor.capture(), any(Metadata.class));
    308     Status status = statusCaptor.getValue();
    309     assertSame(Status.DEADLINE_EXCEEDED.getCode(), status.getCode());
    310   }
    311 
    312   @Test
    313   public void shutdownWithNoTransportsEverCreated() {
    314     channelBuilder.nameResolverFactory(
    315         new FakeNameResolverFactory.Builder(expectedUri)
    316             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
    317             .build());
    318     createChannel();
    319     verify(executorPool).getObject();
    320     verify(executorPool, never()).returnObject(anyObject());
    321     verify(mockTransportFactory).getScheduledExecutorService();
    322     verifyNoMoreInteractions(mockTransportFactory);
    323     channel.shutdown();
    324     assertTrue(channel.isShutdown());
    325     assertTrue(channel.isTerminated());
    326     verify(executorPool).returnObject(executor.getScheduledExecutorService());
    327   }
    328 
    329   @Test
    330   public void channelzMembership() throws Exception {
    331     createChannel();
    332     assertNotNull(channelz.getRootChannel(channel.getLogId().getId()));
    333     assertFalse(channelz.containsSubchannel(channel.getLogId()));
    334     channel.shutdownNow();
    335     channel.awaitTermination(5, TimeUnit.SECONDS);
    336     assertNull(channelz.getRootChannel(channel.getLogId().getId()));
    337     assertFalse(channelz.containsSubchannel(channel.getLogId()));
    338   }
    339 
    340   @Test
    341   public void channelzMembership_subchannel() throws Exception {
    342     createChannel();
    343     assertNotNull(channelz.getRootChannel(channel.getLogId().getId()));
    344 
    345     AbstractSubchannel subchannel =
    346         (AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY);
    347     // subchannels are not root channels
    348     assertNull(channelz.getRootChannel(subchannel.getInternalSubchannel().getLogId().getId()));
    349     assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId()));
    350     assertThat(getStats(channel).subchannels)
    351         .containsExactly(subchannel.getInternalSubchannel());
    352 
    353     subchannel.requestConnection();
    354     MockClientTransportInfo transportInfo = transports.poll();
    355     assertNotNull(transportInfo);
    356     assertTrue(channelz.containsClientSocket(transportInfo.transport.getLogId()));
    357 
    358     // terminate transport
    359     transportInfo.listener.transportTerminated();
    360     assertFalse(channelz.containsClientSocket(transportInfo.transport.getLogId()));
    361 
    362     // terminate subchannel
    363     assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId()));
    364     subchannel.shutdown();
    365     timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
    366     timer.runDueTasks();
    367     assertFalse(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId()));
    368     assertThat(getStats(channel).subchannels).isEmpty();
    369 
    370     // channel still appears
    371     assertNotNull(channelz.getRootChannel(channel.getLogId().getId()));
    372   }
    373 
    374   @Test
    375   public void channelzMembership_oob() throws Exception {
    376     createChannel();
    377     OobChannel oob = (OobChannel) helper.createOobChannel(addressGroup, AUTHORITY);
    378     // oob channels are not root channels
    379     assertNull(channelz.getRootChannel(oob.getLogId().getId()));
    380     assertTrue(channelz.containsSubchannel(oob.getLogId()));
    381     assertThat(getStats(channel).subchannels).containsExactly(oob);
    382     assertTrue(channelz.containsSubchannel(oob.getLogId()));
    383 
    384     AbstractSubchannel subchannel = (AbstractSubchannel) oob.getSubchannel();
    385     assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId()));
    386     assertThat(getStats(oob).subchannels)
    387         .containsExactly(subchannel.getInternalSubchannel());
    388     assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId()));
    389 
    390     oob.getSubchannel().requestConnection();
    391     MockClientTransportInfo transportInfo = transports.poll();
    392     assertNotNull(transportInfo);
    393     assertTrue(channelz.containsClientSocket(transportInfo.transport.getLogId()));
    394 
    395     // terminate transport
    396     transportInfo.listener.transportTerminated();
    397     assertFalse(channelz.containsClientSocket(transportInfo.transport.getLogId()));
    398 
    399     // terminate oobchannel
    400     oob.shutdown();
    401     assertFalse(channelz.containsSubchannel(oob.getLogId()));
    402     assertThat(getStats(channel).subchannels).isEmpty();
    403     assertFalse(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId()));
    404 
    405     // channel still appears
    406     assertNotNull(channelz.getRootChannel(channel.getLogId().getId()));
    407   }
    408 
    409   @Test
    410   public void callsAndShutdown() {
    411     subtestCallsAndShutdown(false, false);
    412   }
    413 
    414   @Test
    415   public void callsAndShutdownNow() {
    416     subtestCallsAndShutdown(true, false);
    417   }
    418 
    419   /** Make sure shutdownNow() after shutdown() has an effect. */
    420   @Test
    421   public void callsAndShutdownAndShutdownNow() {
    422     subtestCallsAndShutdown(false, true);
    423   }
    424 
    425   private void subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAfterShutdown) {
    426     FakeNameResolverFactory nameResolverFactory =
    427         new FakeNameResolverFactory.Builder(expectedUri).build();
    428     channelBuilder.nameResolverFactory(nameResolverFactory);
    429     createChannel();
    430     verify(executorPool).getObject();
    431     ClientStream mockStream = mock(ClientStream.class);
    432     ClientStream mockStream2 = mock(ClientStream.class);
    433     Metadata headers = new Metadata();
    434     Metadata headers2 = new Metadata();
    435 
    436     // Configure the picker so that first RPC goes to delayed transport, and second RPC goes to
    437     // real transport.
    438     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
    439     subchannel.requestConnection();
    440     verify(mockTransportFactory)
    441         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
    442     MockClientTransportInfo transportInfo = transports.poll();
    443     ConnectionClientTransport mockTransport = transportInfo.transport;
    444     verify(mockTransport).start(any(ManagedClientTransport.Listener.class));
    445     ManagedClientTransport.Listener transportListener = transportInfo.listener;
    446     when(mockTransport.newStream(same(method), same(headers), same(CallOptions.DEFAULT)))
    447         .thenReturn(mockStream);
    448     when(mockTransport.newStream(same(method), same(headers2), same(CallOptions.DEFAULT)))
    449         .thenReturn(mockStream2);
    450     transportListener.transportReady();
    451     when(mockPicker.pickSubchannel(
    452         new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT))).thenReturn(
    453         PickResult.withNoResult());
    454     when(mockPicker.pickSubchannel(
    455         new PickSubchannelArgsImpl(method, headers2, CallOptions.DEFAULT))).thenReturn(
    456         PickResult.withSubchannel(subchannel));
    457     helper.updateBalancingState(READY, mockPicker);
    458 
    459     // First RPC, will be pending
    460     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
    461     verify(mockTransportFactory)
    462         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
    463     call.start(mockCallListener, headers);
    464 
    465     verify(mockTransport, never())
    466         .newStream(same(method), same(headers), same(CallOptions.DEFAULT));
    467 
    468     // Second RPC, will be assigned to the real transport
    469     ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT);
    470     call2.start(mockCallListener2, headers2);
    471     verify(mockTransport).newStream(same(method), same(headers2), same(CallOptions.DEFAULT));
    472     verify(mockTransport).newStream(same(method), same(headers2), same(CallOptions.DEFAULT));
    473     verify(mockStream2).start(any(ClientStreamListener.class));
    474 
    475     // Shutdown
    476     if (shutdownNow) {
    477       channel.shutdownNow();
    478     } else {
    479       channel.shutdown();
    480       if (shutdownNowAfterShutdown) {
    481         channel.shutdownNow();
    482         shutdownNow = true;
    483       }
    484     }
    485     assertTrue(channel.isShutdown());
    486     assertFalse(channel.isTerminated());
    487     assertEquals(1, nameResolverFactory.resolvers.size());
    488     verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
    489 
    490     // Further calls should fail without going to the transport
    491     ClientCall<String, Integer> call3 = channel.newCall(method, CallOptions.DEFAULT);
    492     call3.start(mockCallListener3, headers2);
    493     timer.runDueTasks();
    494     executor.runDueTasks();
    495 
    496     verify(mockCallListener3).onClose(statusCaptor.capture(), any(Metadata.class));
    497     assertSame(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
    498 
    499     if (shutdownNow) {
    500       // LoadBalancer and NameResolver are shut down as soon as delayed transport is terminated.
    501       verify(mockLoadBalancer).shutdown();
    502       assertTrue(nameResolverFactory.resolvers.get(0).shutdown);
    503       // call should have been aborted by delayed transport
    504       executor.runDueTasks();
    505       verify(mockCallListener).onClose(same(ManagedChannelImpl.SHUTDOWN_NOW_STATUS),
    506           any(Metadata.class));
    507     } else {
    508       // LoadBalancer and NameResolver are still running.
    509       verify(mockLoadBalancer, never()).shutdown();
    510       assertFalse(nameResolverFactory.resolvers.get(0).shutdown);
    511       // call and call2 are still alive, and can still be assigned to a real transport
    512       SubchannelPicker picker2 = mock(SubchannelPicker.class);
    513       when(picker2.pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT)))
    514           .thenReturn(PickResult.withSubchannel(subchannel));
    515       helper.updateBalancingState(READY, picker2);
    516       executor.runDueTasks();
    517       verify(mockTransport).newStream(same(method), same(headers), same(CallOptions.DEFAULT));
    518       verify(mockStream).start(any(ClientStreamListener.class));
    519     }
    520 
    521     // After call is moved out of delayed transport, LoadBalancer, NameResolver and the transports
    522     // will be shutdown.
    523     verify(mockLoadBalancer).shutdown();
    524     assertTrue(nameResolverFactory.resolvers.get(0).shutdown);
    525 
    526     if (shutdownNow) {
    527       // Channel shutdownNow() all subchannels after shutting down LoadBalancer
    528       verify(mockTransport).shutdownNow(ManagedChannelImpl.SHUTDOWN_NOW_STATUS);
    529     } else {
    530       verify(mockTransport, never()).shutdownNow(any(Status.class));
    531     }
    532     // LoadBalancer should shutdown the subchannel
    533     subchannel.shutdown();
    534     if (shutdownNow) {
    535       verify(mockTransport).shutdown(same(ManagedChannelImpl.SHUTDOWN_NOW_STATUS));
    536     } else {
    537       verify(mockTransport).shutdown(same(ManagedChannelImpl.SHUTDOWN_STATUS));
    538     }
    539 
    540     // Killing the remaining real transport will terminate the channel
    541     transportListener.transportShutdown(Status.UNAVAILABLE);
    542     assertFalse(channel.isTerminated());
    543     verify(executorPool, never()).returnObject(anyObject());
    544     transportListener.transportTerminated();
    545     assertTrue(channel.isTerminated());
    546     verify(executorPool).returnObject(executor.getScheduledExecutorService());
    547     verifyNoMoreInteractions(oobExecutorPool);
    548 
    549     verify(mockTransportFactory)
    550         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
    551     verify(mockTransportFactory).close();
    552     verify(mockTransport, atLeast(0)).getLogId();
    553     verifyNoMoreInteractions(mockTransport);
    554   }
    555 
    556   @Test
    557   public void noMoreCallbackAfterLoadBalancerShutdown() {
    558     FakeNameResolverFactory nameResolverFactory =
    559         new FakeNameResolverFactory.Builder(expectedUri)
    560             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
    561             .build();
    562     channelBuilder.nameResolverFactory(nameResolverFactory);
    563     Status resolutionError = Status.UNAVAILABLE.withDescription("Resolution failed");
    564     createChannel();
    565 
    566     FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0);
    567     verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
    568     verify(mockLoadBalancer).handleResolvedAddressGroups(
    569         eq(Arrays.asList(addressGroup)), eq(Attributes.EMPTY));
    570 
    571     Subchannel subchannel1 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
    572     Subchannel subchannel2 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
    573     subchannel1.requestConnection();
    574     subchannel2.requestConnection();
    575     verify(mockTransportFactory, times(2))
    576         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
    577     MockClientTransportInfo transportInfo1 = transports.poll();
    578     MockClientTransportInfo transportInfo2 = transports.poll();
    579 
    580     // LoadBalancer receives all sorts of callbacks
    581     transportInfo1.listener.transportReady();
    582     verify(mockLoadBalancer, times(2))
    583         .handleSubchannelState(same(subchannel1), stateInfoCaptor.capture());
    584     assertSame(CONNECTING, stateInfoCaptor.getAllValues().get(0).getState());
    585     assertSame(READY, stateInfoCaptor.getAllValues().get(1).getState());
    586 
    587     verify(mockLoadBalancer)
    588         .handleSubchannelState(same(subchannel2), stateInfoCaptor.capture());
    589     assertSame(CONNECTING, stateInfoCaptor.getValue().getState());
    590 
    591     resolver.listener.onError(resolutionError);
    592     verify(mockLoadBalancer).handleNameResolutionError(resolutionError);
    593 
    594     verifyNoMoreInteractions(mockLoadBalancer);
    595 
    596     channel.shutdown();
    597     verify(mockLoadBalancer).shutdown();
    598 
    599     // No more callback should be delivered to LoadBalancer after it's shut down
    600     transportInfo2.listener.transportReady();
    601     resolver.listener.onError(resolutionError);
    602     resolver.resolved();
    603     verifyNoMoreInteractions(mockLoadBalancer);
    604   }
    605 
    606   @Test
    607   public void interceptor() throws Exception {
    608     final AtomicLong atomic = new AtomicLong();
    609     ClientInterceptor interceptor = new ClientInterceptor() {
    610       @Override
    611       public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> interceptCall(
    612           MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions,
    613           Channel next) {
    614         atomic.set(1);
    615         return next.newCall(method, callOptions);
    616       }
    617     };
    618     createChannel(interceptor);
    619     assertNotNull(channel.newCall(method, CallOptions.DEFAULT));
    620     assertEquals(1, atomic.get());
    621   }
    622 
    623   @Test
    624   public void callOptionsExecutor() {
    625     Metadata headers = new Metadata();
    626     ClientStream mockStream = mock(ClientStream.class);
    627     FakeClock callExecutor = new FakeClock();
    628     createChannel();
    629 
    630     // Start a call with a call executor
    631     CallOptions options =
    632         CallOptions.DEFAULT.withExecutor(callExecutor.getScheduledExecutorService());
    633     ClientCall<String, Integer> call = channel.newCall(method, options);
    634     call.start(mockCallListener, headers);
    635 
    636     // Make the transport available
    637     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
    638     verify(mockTransportFactory, never())
    639         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
    640     subchannel.requestConnection();
    641     verify(mockTransportFactory)
    642         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
    643     MockClientTransportInfo transportInfo = transports.poll();
    644     ConnectionClientTransport mockTransport = transportInfo.transport;
    645     ManagedClientTransport.Listener transportListener = transportInfo.listener;
    646     when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class)))
    647         .thenReturn(mockStream);
    648     transportListener.transportReady();
    649     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
    650         .thenReturn(PickResult.withSubchannel(subchannel));
    651     assertEquals(0, callExecutor.numPendingTasks());
    652     helper.updateBalancingState(READY, mockPicker);
    653 
    654     // Real streams are started in the call executor if they were previously buffered.
    655     assertEquals(1, callExecutor.runDueTasks());
    656     verify(mockTransport).newStream(same(method), same(headers), same(options));
    657     verify(mockStream).start(streamListenerCaptor.capture());
    658 
    659     // Call listener callbacks are also run in the call executor
    660     ClientStreamListener streamListener = streamListenerCaptor.getValue();
    661     Metadata trailers = new Metadata();
    662     assertEquals(0, callExecutor.numPendingTasks());
    663     streamListener.closed(Status.CANCELLED, trailers);
    664     verify(mockCallListener, never()).onClose(same(Status.CANCELLED), same(trailers));
    665     assertEquals(1, callExecutor.runDueTasks());
    666     verify(mockCallListener).onClose(same(Status.CANCELLED), same(trailers));
    667 
    668 
    669     transportListener.transportShutdown(Status.UNAVAILABLE);
    670     transportListener.transportTerminated();
    671 
    672     // Clean up as much as possible to allow the channel to terminate.
    673     subchannel.shutdown();
    674     timer.forwardNanos(
    675         TimeUnit.SECONDS.toNanos(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS));
    676   }
    677 
    678   @Test
    679   public void nameResolutionFailed() {
    680     Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
    681     FakeNameResolverFactory nameResolverFactory =
    682         new FakeNameResolverFactory.Builder(expectedUri)
    683             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
    684             .setError(error)
    685             .build();
    686     channelBuilder.nameResolverFactory(nameResolverFactory);
    687     // Name resolution is started as soon as channel is created.
    688     createChannel();
    689     FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0);
    690     verify(mockLoadBalancer).handleNameResolutionError(same(error));
    691     assertEquals(1, timer.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER));
    692 
    693     timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS - 1);
    694     assertEquals(0, resolver.refreshCalled);
    695 
    696     timer.forwardNanos(1);
    697     assertEquals(1, resolver.refreshCalled);
    698     verify(mockLoadBalancer, times(2)).handleNameResolutionError(same(error));
    699 
    700     // Verify an additional name resolution failure does not schedule another timer
    701     resolver.refresh();
    702     verify(mockLoadBalancer, times(3)).handleNameResolutionError(same(error));
    703     assertEquals(1, timer.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER));
    704 
    705     // Allow the next refresh attempt to succeed
    706     resolver.error = null;
    707 
    708     // For the second attempt, the backoff should occur at RECONNECT_BACKOFF_INTERVAL_NANOS * 2
    709     timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS * 2 - 1);
    710     assertEquals(2, resolver.refreshCalled);
    711     timer.forwardNanos(1);
    712     assertEquals(3, resolver.refreshCalled);
    713     assertEquals(0, timer.numPendingTasks());
    714 
    715     // Verify that the successful resolution reset the backoff policy
    716     resolver.listener.onError(error);
    717     timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS - 1);
    718     assertEquals(3, resolver.refreshCalled);
    719     timer.forwardNanos(1);
    720     assertEquals(4, resolver.refreshCalled);
    721     assertEquals(0, timer.numPendingTasks());
    722   }
    723 
    724   @Test
    725   public void nameResolutionFailed_delayedTransportShutdownCancelsBackoff() {
    726     Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
    727 
    728     FakeNameResolverFactory nameResolverFactory =
    729         new FakeNameResolverFactory.Builder(expectedUri).setError(error).build();
    730     channelBuilder.nameResolverFactory(nameResolverFactory);
    731     // Name resolution is started as soon as channel is created.
    732     createChannel();
    733     verify(mockLoadBalancer).handleNameResolutionError(same(error));
    734 
    735     FakeClock.ScheduledTask nameResolverBackoff = getNameResolverRefresh();
    736     assertNotNull(nameResolverBackoff);
    737     assertFalse(nameResolverBackoff.isCancelled());
    738 
    739     // Add a pending call to the delayed transport
    740     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
    741     Metadata headers = new Metadata();
    742     call.start(mockCallListener, headers);
    743 
    744     // The pending call on the delayed transport stops the name resolver backoff from cancelling
    745     channel.shutdown();
    746     assertFalse(nameResolverBackoff.isCancelled());
    747 
    748     // Notify that a subchannel is ready, which drains the delayed transport
    749     SubchannelPicker picker = mock(SubchannelPicker.class);
    750     Status status = Status.UNAVAILABLE.withDescription("for test");
    751     when(picker.pickSubchannel(any(PickSubchannelArgs.class)))
    752         .thenReturn(PickResult.withDrop(status));
    753     helper.updateBalancingState(READY, picker);
    754     executor.runDueTasks();
    755     verify(mockCallListener).onClose(same(status), any(Metadata.class));
    756 
    757     assertTrue(nameResolverBackoff.isCancelled());
    758   }
    759 
    760   @Test
    761   public void nameResolverReturnsEmptySubLists() {
    762     String errorDescription = "NameResolver returned an empty list";
    763 
    764     // Pass a FakeNameResolverFactory with an empty list
    765     createChannel();
    766 
    767     // LoadBalancer received the error
    768     verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
    769     verify(mockLoadBalancer).handleNameResolutionError(statusCaptor.capture());
    770     Status status = statusCaptor.getValue();
    771     assertSame(Status.Code.UNAVAILABLE, status.getCode());
    772     assertEquals(errorDescription, status.getDescription());
    773   }
    774 
    775   @Test
    776   public void loadBalancerThrowsInHandleResolvedAddresses() {
    777     RuntimeException ex = new RuntimeException("simulated");
    778     // Delay the success of name resolution until allResolved() is called
    779     FakeNameResolverFactory nameResolverFactory =
    780         new FakeNameResolverFactory.Builder(expectedUri)
    781             .setResolvedAtStart(false)
    782             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
    783             .build();
    784     channelBuilder.nameResolverFactory(nameResolverFactory);
    785     createChannel();
    786 
    787     verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
    788     doThrow(ex).when(mockLoadBalancer).handleResolvedAddressGroups(
    789         Matchers.<List<EquivalentAddressGroup>>anyObject(), any(Attributes.class));
    790 
    791     // NameResolver returns addresses.
    792     nameResolverFactory.allResolved();
    793 
    794     // Exception thrown from balancer is caught by ChannelExecutor, making channel enter panic mode.
    795     verifyPanicMode(ex);
    796   }
    797 
    798   @Test
    799   public void nameResolvedAfterChannelShutdown() {
    800     // Delay the success of name resolution until allResolved() is called.
    801     FakeNameResolverFactory nameResolverFactory =
    802         new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build();
    803     channelBuilder.nameResolverFactory(nameResolverFactory);
    804     createChannel();
    805 
    806     channel.shutdown();
    807 
    808     assertTrue(channel.isShutdown());
    809     assertTrue(channel.isTerminated());
    810     verify(mockLoadBalancer).shutdown();
    811     // Name resolved after the channel is shut down, which is possible if the name resolution takes
    812     // time and is not cancellable. The resolved address will be dropped.
    813     nameResolverFactory.allResolved();
    814     verifyNoMoreInteractions(mockLoadBalancer);
    815   }
    816 
    817   /**
    818    * Verify that if the first resolved address points to a server that cannot be connected, the call
    819    * will end up with the second address which works.
    820    */
    821   @Test
    822   public void firstResolvedServerFailedToConnect() throws Exception {
    823     final SocketAddress goodAddress = new SocketAddress() {
    824         @Override public String toString() {
    825           return "goodAddress";
    826         }
    827       };
    828     final SocketAddress badAddress = new SocketAddress() {
    829         @Override public String toString() {
    830           return "badAddress";
    831         }
    832       };
    833     InOrder inOrder = inOrder(mockLoadBalancer);
    834 
    835     List<SocketAddress> resolvedAddrs = Arrays.asList(badAddress, goodAddress);
    836     FakeNameResolverFactory nameResolverFactory =
    837         new FakeNameResolverFactory.Builder(expectedUri)
    838             .setServers(Collections.singletonList(new EquivalentAddressGroup(resolvedAddrs)))
    839             .build();
    840     channelBuilder.nameResolverFactory(nameResolverFactory);
    841     createChannel();
    842 
    843     // Start the call
    844     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
    845     Metadata headers = new Metadata();
    846     call.start(mockCallListener, headers);
    847     executor.runDueTasks();
    848 
    849     // Simulate name resolution results
    850     EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(resolvedAddrs);
    851     inOrder.verify(mockLoadBalancer).handleResolvedAddressGroups(
    852         eq(Arrays.asList(addressGroup)), eq(Attributes.EMPTY));
    853     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
    854     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
    855         .thenReturn(PickResult.withSubchannel(subchannel));
    856     subchannel.requestConnection();
    857     inOrder.verify(mockLoadBalancer).handleSubchannelState(
    858         same(subchannel), stateInfoCaptor.capture());
    859     assertEquals(CONNECTING, stateInfoCaptor.getValue().getState());
    860 
    861     // The channel will starts with the first address (badAddress)
    862     verify(mockTransportFactory)
    863         .newClientTransport(same(badAddress), any(ClientTransportOptions.class));
    864     verify(mockTransportFactory, times(0))
    865         .newClientTransport(same(goodAddress), any(ClientTransportOptions.class));
    866 
    867     MockClientTransportInfo badTransportInfo = transports.poll();
    868     // Which failed to connect
    869     badTransportInfo.listener.transportShutdown(Status.UNAVAILABLE);
    870     inOrder.verifyNoMoreInteractions();
    871 
    872     // The channel then try the second address (goodAddress)
    873     verify(mockTransportFactory)
    874         .newClientTransport(same(goodAddress), any(ClientTransportOptions.class));
    875     MockClientTransportInfo goodTransportInfo = transports.poll();
    876     when(goodTransportInfo.transport.newStream(
    877             any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
    878         .thenReturn(mock(ClientStream.class));
    879 
    880     goodTransportInfo.listener.transportReady();
    881     inOrder.verify(mockLoadBalancer).handleSubchannelState(
    882         same(subchannel), stateInfoCaptor.capture());
    883     assertEquals(READY, stateInfoCaptor.getValue().getState());
    884 
    885     // A typical LoadBalancer will call this once the subchannel becomes READY
    886     helper.updateBalancingState(READY, mockPicker);
    887     // Delayed transport uses the app executor to create real streams.
    888     executor.runDueTasks();
    889 
    890     verify(goodTransportInfo.transport).newStream(same(method), same(headers),
    891         same(CallOptions.DEFAULT));
    892     // The bad transport was never used.
    893     verify(badTransportInfo.transport, times(0)).newStream(any(MethodDescriptor.class),
    894         any(Metadata.class), any(CallOptions.class));
    895   }
    896 
    897   @Test
    898   public void failFastRpcFailFromErrorFromBalancer() {
    899     subtestFailRpcFromBalancer(false, false, true);
    900   }
    901 
    902   @Test
    903   public void failFastRpcFailFromDropFromBalancer() {
    904     subtestFailRpcFromBalancer(false, true, true);
    905   }
    906 
    907   @Test
    908   public void waitForReadyRpcImmuneFromErrorFromBalancer() {
    909     subtestFailRpcFromBalancer(true, false, false);
    910   }
    911 
    912   @Test
    913   public void waitForReadyRpcFailFromDropFromBalancer() {
    914     subtestFailRpcFromBalancer(true, true, true);
    915   }
    916 
    917   private void subtestFailRpcFromBalancer(boolean waitForReady, boolean drop, boolean shouldFail) {
    918     createChannel();
    919 
    920     // This call will be buffered by the channel, thus involve delayed transport
    921     CallOptions callOptions = CallOptions.DEFAULT;
    922     if (waitForReady) {
    923       callOptions = callOptions.withWaitForReady();
    924     } else {
    925       callOptions = callOptions.withoutWaitForReady();
    926     }
    927     ClientCall<String, Integer> call1 = channel.newCall(method, callOptions);
    928     call1.start(mockCallListener, new Metadata());
    929 
    930     SubchannelPicker picker = mock(SubchannelPicker.class);
    931     Status status = Status.UNAVAILABLE.withDescription("for test");
    932 
    933     when(picker.pickSubchannel(any(PickSubchannelArgs.class)))
    934         .thenReturn(drop ? PickResult.withDrop(status) : PickResult.withError(status));
    935     helper.updateBalancingState(READY, picker);
    936 
    937     executor.runDueTasks();
    938     if (shouldFail) {
    939       verify(mockCallListener).onClose(same(status), any(Metadata.class));
    940     } else {
    941       verifyZeroInteractions(mockCallListener);
    942     }
    943 
    944     // This call doesn't involve delayed transport
    945     ClientCall<String, Integer> call2 = channel.newCall(method, callOptions);
    946     call2.start(mockCallListener2, new Metadata());
    947 
    948     executor.runDueTasks();
    949     if (shouldFail) {
    950       verify(mockCallListener2).onClose(same(status), any(Metadata.class));
    951     } else {
    952       verifyZeroInteractions(mockCallListener2);
    953     }
    954   }
    955 
    956   /**
    957    * Verify that if all resolved addresses failed to connect, a fail-fast call will fail, while a
    958    * wait-for-ready call will still be buffered.
    959    */
    960   @Test
    961   public void allServersFailedToConnect() throws Exception {
    962     final SocketAddress addr1 = new SocketAddress() {
    963         @Override public String toString() {
    964           return "addr1";
    965         }
    966       };
    967     final SocketAddress addr2 = new SocketAddress() {
    968         @Override public String toString() {
    969           return "addr2";
    970         }
    971       };
    972     InOrder inOrder = inOrder(mockLoadBalancer);
    973 
    974     List<SocketAddress> resolvedAddrs = Arrays.asList(addr1, addr2);
    975 
    976     FakeNameResolverFactory nameResolverFactory =
    977         new FakeNameResolverFactory.Builder(expectedUri)
    978             .setServers(Collections.singletonList(new EquivalentAddressGroup(resolvedAddrs)))
    979             .build();
    980     channelBuilder.nameResolverFactory(nameResolverFactory);
    981     createChannel();
    982 
    983     // Start a wait-for-ready call
    984     ClientCall<String, Integer> call =
    985         channel.newCall(method, CallOptions.DEFAULT.withWaitForReady());
    986     Metadata headers = new Metadata();
    987     call.start(mockCallListener, headers);
    988     // ... and a fail-fast call
    989     ClientCall<String, Integer> call2 =
    990         channel.newCall(method, CallOptions.DEFAULT.withoutWaitForReady());
    991     call2.start(mockCallListener2, headers);
    992     executor.runDueTasks();
    993 
    994     // Simulate name resolution results
    995     EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(resolvedAddrs);
    996     inOrder.verify(mockLoadBalancer).handleResolvedAddressGroups(
    997         eq(Arrays.asList(addressGroup)), eq(Attributes.EMPTY));
    998     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
    999     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
   1000         .thenReturn(PickResult.withSubchannel(subchannel));
   1001     subchannel.requestConnection();
   1002 
   1003     inOrder.verify(mockLoadBalancer).handleSubchannelState(
   1004         same(subchannel), stateInfoCaptor.capture());
   1005     assertEquals(CONNECTING, stateInfoCaptor.getValue().getState());
   1006 
   1007     // Connecting to server1, which will fail
   1008     verify(mockTransportFactory)
   1009         .newClientTransport(same(addr1), any(ClientTransportOptions.class));
   1010     verify(mockTransportFactory, times(0))
   1011         .newClientTransport(same(addr2), any(ClientTransportOptions.class));
   1012     MockClientTransportInfo transportInfo1 = transports.poll();
   1013     transportInfo1.listener.transportShutdown(Status.UNAVAILABLE);
   1014 
   1015     // Connecting to server2, which will fail too
   1016     verify(mockTransportFactory)
   1017         .newClientTransport(same(addr2), any(ClientTransportOptions.class));
   1018     MockClientTransportInfo transportInfo2 = transports.poll();
   1019     Status server2Error = Status.UNAVAILABLE.withDescription("Server2 failed to connect");
   1020     transportInfo2.listener.transportShutdown(server2Error);
   1021 
   1022     // ... which makes the subchannel enter TRANSIENT_FAILURE. The last error Status is propagated
   1023     // to LoadBalancer.
   1024     inOrder.verify(mockLoadBalancer).handleSubchannelState(
   1025         same(subchannel), stateInfoCaptor.capture());
   1026     assertEquals(TRANSIENT_FAILURE, stateInfoCaptor.getValue().getState());
   1027     assertSame(server2Error, stateInfoCaptor.getValue().getStatus());
   1028 
   1029     // A typical LoadBalancer would create a picker with error
   1030     SubchannelPicker picker2 = mock(SubchannelPicker.class);
   1031     when(picker2.pickSubchannel(any(PickSubchannelArgs.class)))
   1032         .thenReturn(PickResult.withError(server2Error));
   1033     helper.updateBalancingState(TRANSIENT_FAILURE, picker2);
   1034     executor.runDueTasks();
   1035 
   1036     // ... which fails the fail-fast call
   1037     verify(mockCallListener2).onClose(same(server2Error), any(Metadata.class));
   1038     // ... while the wait-for-ready call stays
   1039     verifyNoMoreInteractions(mockCallListener);
   1040     // No real stream was ever created
   1041     verify(transportInfo1.transport, times(0))
   1042         .newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
   1043     verify(transportInfo2.transport, times(0))
   1044         .newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
   1045   }
   1046 
   1047   @Test
   1048   public void subchannels() {
   1049     createChannel();
   1050 
   1051     // createSubchannel() always return a new Subchannel
   1052     Attributes attrs1 = Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, "attr1").build();
   1053     Attributes attrs2 = Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, "attr2").build();
   1054     Subchannel sub1 = helper.createSubchannel(addressGroup, attrs1);
   1055     Subchannel sub2 = helper.createSubchannel(addressGroup, attrs2);
   1056     assertNotSame(sub1, sub2);
   1057     assertNotSame(attrs1, attrs2);
   1058     assertSame(attrs1, sub1.getAttributes());
   1059     assertSame(attrs2, sub2.getAttributes());
   1060     assertSame(addressGroup, sub1.getAddresses());
   1061     assertSame(addressGroup, sub2.getAddresses());
   1062 
   1063     // requestConnection()
   1064     verify(mockTransportFactory, never())
   1065         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
   1066     sub1.requestConnection();
   1067     verify(mockTransportFactory).newClientTransport(socketAddress, clientTransportOptions);
   1068     MockClientTransportInfo transportInfo1 = transports.poll();
   1069     assertNotNull(transportInfo1);
   1070 
   1071     sub2.requestConnection();
   1072     verify(mockTransportFactory, times(2))
   1073         .newClientTransport(socketAddress, clientTransportOptions);
   1074     MockClientTransportInfo transportInfo2 = transports.poll();
   1075     assertNotNull(transportInfo2);
   1076 
   1077     sub1.requestConnection();
   1078     sub2.requestConnection();
   1079     verify(mockTransportFactory, times(2))
   1080         .newClientTransport(socketAddress, clientTransportOptions);
   1081 
   1082     // shutdown() has a delay
   1083     sub1.shutdown();
   1084     timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS - 1, TimeUnit.SECONDS);
   1085     sub1.shutdown();
   1086     verify(transportInfo1.transport, never()).shutdown(any(Status.class));
   1087     timer.forwardTime(1, TimeUnit.SECONDS);
   1088     verify(transportInfo1.transport).shutdown(same(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_STATUS));
   1089 
   1090     // ... but not after Channel is terminating
   1091     verify(mockLoadBalancer, never()).shutdown();
   1092     channel.shutdown();
   1093     verify(mockLoadBalancer).shutdown();
   1094     verify(transportInfo2.transport, never()).shutdown(any(Status.class));
   1095 
   1096     sub2.shutdown();
   1097     verify(transportInfo2.transport).shutdown(same(ManagedChannelImpl.SHUTDOWN_STATUS));
   1098 
   1099     // Cleanup
   1100     transportInfo1.listener.transportShutdown(Status.UNAVAILABLE);
   1101     transportInfo1.listener.transportTerminated();
   1102     transportInfo2.listener.transportShutdown(Status.UNAVAILABLE);
   1103     transportInfo2.listener.transportTerminated();
   1104     timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
   1105   }
   1106 
   1107   @Test
   1108   public void subchannelsWhenChannelShutdownNow() {
   1109     createChannel();
   1110     Subchannel sub1 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
   1111     Subchannel sub2 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
   1112     sub1.requestConnection();
   1113     sub2.requestConnection();
   1114 
   1115     assertEquals(2, transports.size());
   1116     MockClientTransportInfo ti1 = transports.poll();
   1117     MockClientTransportInfo ti2 = transports.poll();
   1118 
   1119     ti1.listener.transportReady();
   1120     ti2.listener.transportReady();
   1121 
   1122     channel.shutdownNow();
   1123     verify(ti1.transport).shutdownNow(any(Status.class));
   1124     verify(ti2.transport).shutdownNow(any(Status.class));
   1125 
   1126     ti1.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"));
   1127     ti2.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"));
   1128     ti1.listener.transportTerminated();
   1129 
   1130     assertFalse(channel.isTerminated());
   1131     ti2.listener.transportTerminated();
   1132     assertTrue(channel.isTerminated());
   1133   }
   1134 
   1135   @Test
   1136   public void subchannelsNoConnectionShutdown() {
   1137     createChannel();
   1138     Subchannel sub1 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
   1139     Subchannel sub2 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
   1140 
   1141     channel.shutdown();
   1142     verify(mockLoadBalancer).shutdown();
   1143     sub1.shutdown();
   1144     assertFalse(channel.isTerminated());
   1145     sub2.shutdown();
   1146     assertTrue(channel.isTerminated());
   1147     verify(mockTransportFactory, never())
   1148         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
   1149   }
   1150 
   1151   @Test
   1152   public void subchannelsNoConnectionShutdownNow() {
   1153     createChannel();
   1154     helper.createSubchannel(addressGroup, Attributes.EMPTY);
   1155     helper.createSubchannel(addressGroup, Attributes.EMPTY);
   1156     channel.shutdownNow();
   1157 
   1158     verify(mockLoadBalancer).shutdown();
   1159     // Channel's shutdownNow() will call shutdownNow() on all subchannels and oobchannels.
   1160     // Therefore, channel is terminated without relying on LoadBalancer to shutdown subchannels.
   1161     assertTrue(channel.isTerminated());
   1162     verify(mockTransportFactory, never())
   1163         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
   1164   }
   1165 
   1166   @Test
   1167   public void oobchannels() {
   1168     createChannel();
   1169 
   1170     ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1authority");
   1171     ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2authority");
   1172     verify(oobExecutorPool, times(2)).getObject();
   1173 
   1174     assertEquals("oob1authority", oob1.authority());
   1175     assertEquals("oob2authority", oob2.authority());
   1176 
   1177     // OOB channels create connections lazily.  A new call will initiate the connection.
   1178     Metadata headers = new Metadata();
   1179     ClientCall<String, Integer> call = oob1.newCall(method, CallOptions.DEFAULT);
   1180     call.start(mockCallListener, headers);
   1181     verify(mockTransportFactory)
   1182         .newClientTransport(
   1183             socketAddress,
   1184             new ClientTransportOptions().setAuthority("oob1authority").setUserAgent(USER_AGENT));
   1185     MockClientTransportInfo transportInfo = transports.poll();
   1186     assertNotNull(transportInfo);
   1187 
   1188     assertEquals(0, oobExecutor.numPendingTasks());
   1189     transportInfo.listener.transportReady();
   1190     assertEquals(1, oobExecutor.runDueTasks());
   1191     verify(transportInfo.transport).newStream(same(method), same(headers),
   1192         same(CallOptions.DEFAULT));
   1193 
   1194     // The transport goes away
   1195     transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
   1196     transportInfo.listener.transportTerminated();
   1197 
   1198     // A new call will trigger a new transport
   1199     ClientCall<String, Integer> call2 = oob1.newCall(method, CallOptions.DEFAULT);
   1200     call2.start(mockCallListener2, headers);
   1201     ClientCall<String, Integer> call3 =
   1202         oob1.newCall(method, CallOptions.DEFAULT.withWaitForReady());
   1203     call3.start(mockCallListener3, headers);
   1204     verify(mockTransportFactory, times(2)).newClientTransport(
   1205         socketAddress,
   1206         new ClientTransportOptions().setAuthority("oob1authority").setUserAgent(USER_AGENT));
   1207     transportInfo = transports.poll();
   1208     assertNotNull(transportInfo);
   1209 
   1210     // This transport fails
   1211     Status transportError = Status.UNAVAILABLE.withDescription("Connection refused");
   1212     assertEquals(0, oobExecutor.numPendingTasks());
   1213     transportInfo.listener.transportShutdown(transportError);
   1214     assertTrue(oobExecutor.runDueTasks() > 0);
   1215 
   1216     // Fail-fast RPC will fail, while wait-for-ready RPC will still be pending
   1217     verify(mockCallListener2).onClose(same(transportError), any(Metadata.class));
   1218     verify(mockCallListener3, never()).onClose(any(Status.class), any(Metadata.class));
   1219 
   1220     // Shutdown
   1221     assertFalse(oob1.isShutdown());
   1222     assertFalse(oob2.isShutdown());
   1223     oob1.shutdown();
   1224     verify(oobExecutorPool, never()).returnObject(anyObject());
   1225     oob2.shutdownNow();
   1226     assertTrue(oob1.isShutdown());
   1227     assertTrue(oob2.isShutdown());
   1228     assertTrue(oob2.isTerminated());
   1229     verify(oobExecutorPool).returnObject(oobExecutor.getScheduledExecutorService());
   1230 
   1231     // New RPCs will be rejected.
   1232     assertEquals(0, oobExecutor.numPendingTasks());
   1233     ClientCall<String, Integer> call4 = oob1.newCall(method, CallOptions.DEFAULT);
   1234     ClientCall<String, Integer> call5 = oob2.newCall(method, CallOptions.DEFAULT);
   1235     call4.start(mockCallListener4, headers);
   1236     call5.start(mockCallListener5, headers);
   1237     assertTrue(oobExecutor.runDueTasks() > 0);
   1238     verify(mockCallListener4).onClose(statusCaptor.capture(), any(Metadata.class));
   1239     Status status4 = statusCaptor.getValue();
   1240     assertEquals(Status.Code.UNAVAILABLE, status4.getCode());
   1241     verify(mockCallListener5).onClose(statusCaptor.capture(), any(Metadata.class));
   1242     Status status5 = statusCaptor.getValue();
   1243     assertEquals(Status.Code.UNAVAILABLE, status5.getCode());
   1244 
   1245     // The pending RPC will still be pending
   1246     verify(mockCallListener3, never()).onClose(any(Status.class), any(Metadata.class));
   1247 
   1248     // This will shutdownNow() the delayed transport, terminating the pending RPC
   1249     assertEquals(0, oobExecutor.numPendingTasks());
   1250     oob1.shutdownNow();
   1251     assertTrue(oobExecutor.runDueTasks() > 0);
   1252     verify(mockCallListener3).onClose(any(Status.class), any(Metadata.class));
   1253 
   1254     // Shut down the channel, and it will not terminated because OOB channel has not.
   1255     channel.shutdown();
   1256     assertFalse(channel.isTerminated());
   1257     // Delayed transport has already terminated.  Terminating the transport terminates the
   1258     // subchannel, which in turn terimates the OOB channel, which terminates the channel.
   1259     assertFalse(oob1.isTerminated());
   1260     verify(oobExecutorPool).returnObject(oobExecutor.getScheduledExecutorService());
   1261     transportInfo.listener.transportTerminated();
   1262     assertTrue(oob1.isTerminated());
   1263     assertTrue(channel.isTerminated());
   1264     verify(oobExecutorPool, times(2)).returnObject(oobExecutor.getScheduledExecutorService());
   1265   }
   1266 
   1267   @Test
   1268   public void oobChannelsWhenChannelShutdownNow() {
   1269     createChannel();
   1270     ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1Authority");
   1271     ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2Authority");
   1272 
   1273     oob1.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata());
   1274     oob2.newCall(method, CallOptions.DEFAULT).start(mockCallListener2, new Metadata());
   1275 
   1276     assertEquals(2, transports.size());
   1277     MockClientTransportInfo ti1 = transports.poll();
   1278     MockClientTransportInfo ti2 = transports.poll();
   1279 
   1280     ti1.listener.transportReady();
   1281     ti2.listener.transportReady();
   1282 
   1283     channel.shutdownNow();
   1284     verify(ti1.transport).shutdownNow(any(Status.class));
   1285     verify(ti2.transport).shutdownNow(any(Status.class));
   1286 
   1287     ti1.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"));
   1288     ti2.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"));
   1289     ti1.listener.transportTerminated();
   1290 
   1291     assertFalse(channel.isTerminated());
   1292     ti2.listener.transportTerminated();
   1293     assertTrue(channel.isTerminated());
   1294   }
   1295 
   1296   @Test
   1297   public void oobChannelsNoConnectionShutdown() {
   1298     createChannel();
   1299     ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1Authority");
   1300     ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2Authority");
   1301     channel.shutdown();
   1302 
   1303     verify(mockLoadBalancer).shutdown();
   1304     oob1.shutdown();
   1305     assertTrue(oob1.isTerminated());
   1306     assertFalse(channel.isTerminated());
   1307     oob2.shutdown();
   1308     assertTrue(oob2.isTerminated());
   1309     assertTrue(channel.isTerminated());
   1310     verify(mockTransportFactory, never())
   1311         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
   1312   }
   1313 
   1314   @Test
   1315   public void oobChannelsNoConnectionShutdownNow() {
   1316     createChannel();
   1317     helper.createOobChannel(addressGroup, "oob1Authority");
   1318     helper.createOobChannel(addressGroup, "oob2Authority");
   1319     channel.shutdownNow();
   1320 
   1321     verify(mockLoadBalancer).shutdown();
   1322     assertTrue(channel.isTerminated());
   1323     // Channel's shutdownNow() will call shutdownNow() on all subchannels and oobchannels.
   1324     // Therefore, channel is terminated without relying on LoadBalancer to shutdown oobchannels.
   1325     verify(mockTransportFactory, never())
   1326         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
   1327   }
   1328 
   1329   @Test
   1330   public void refreshNameResolutionWhenSubchannelConnectionFailed() {
   1331     subtestRefreshNameResolutionWhenConnectionFailed(false);
   1332   }
   1333 
   1334   @Test
   1335   public void refreshNameResolutionWhenOobChannelConnectionFailed() {
   1336     subtestRefreshNameResolutionWhenConnectionFailed(true);
   1337   }
   1338 
   1339   private void subtestRefreshNameResolutionWhenConnectionFailed(boolean isOobChannel) {
   1340     FakeNameResolverFactory nameResolverFactory =
   1341         new FakeNameResolverFactory.Builder(expectedUri)
   1342             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
   1343             .build();
   1344     channelBuilder.nameResolverFactory(nameResolverFactory);
   1345     createChannel();
   1346     FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0);
   1347 
   1348     if (isOobChannel) {
   1349       OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobAuthority");
   1350       oobChannel.getSubchannel().requestConnection();
   1351     } else {
   1352       Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
   1353       subchannel.requestConnection();
   1354     }
   1355 
   1356     MockClientTransportInfo transportInfo = transports.poll();
   1357     assertNotNull(transportInfo);
   1358 
   1359     // Transport closed when connecting
   1360     assertEquals(0, resolver.refreshCalled);
   1361     transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
   1362     assertEquals(1, resolver.refreshCalled);
   1363 
   1364     timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS);
   1365     transportInfo = transports.poll();
   1366     assertNotNull(transportInfo);
   1367 
   1368     transportInfo.listener.transportReady();
   1369 
   1370     // Transport closed when ready
   1371     assertEquals(1, resolver.refreshCalled);
   1372     transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
   1373     assertEquals(2, resolver.refreshCalled);
   1374   }
   1375 
   1376   @Test
   1377   public void uriPattern() {
   1378     assertTrue(ManagedChannelImpl.URI_PATTERN.matcher("a:/").matches());
   1379     assertTrue(ManagedChannelImpl.URI_PATTERN.matcher("Z019+-.:/!@ #~ ").matches());
   1380     assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("a/:").matches()); // "/:" not matched
   1381     assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("0a:/").matches()); // '0' not matched
   1382     assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("a,:/").matches()); // ',' not matched
   1383     assertFalse(ManagedChannelImpl.URI_PATTERN.matcher(" a:/").matches()); // space not matched
   1384   }
   1385 
   1386   /**
   1387    * Test that information such as the Call's context, MethodDescriptor, authority, executor are
   1388    * propagated to newStream() and applyRequestMetadata().
   1389    */
   1390   @Test
   1391   public void informationPropagatedToNewStreamAndCallCredentials() {
   1392     createChannel();
   1393     CallOptions callOptions = CallOptions.DEFAULT.withCallCredentials(creds);
   1394     final Context.Key<String> testKey = Context.key("testing");
   1395     Context ctx = Context.current().withValue(testKey, "testValue");
   1396     final LinkedList<Context> credsApplyContexts = new LinkedList<Context>();
   1397     final LinkedList<Context> newStreamContexts = new LinkedList<Context>();
   1398     doAnswer(new Answer<Void>() {
   1399         @Override
   1400         public Void answer(InvocationOnMock in) throws Throwable {
   1401           credsApplyContexts.add(Context.current());
   1402           return null;
   1403         }
   1404       }).when(creds).applyRequestMetadata(
   1405           any(MethodDescriptor.class), any(Attributes.class), any(Executor.class),
   1406           any(MetadataApplier.class));
   1407 
   1408     // First call will be on delayed transport.  Only newCall() is run within the expected context,
   1409     // so that we can verify that the context is explicitly attached before calling newStream() and
   1410     // applyRequestMetadata(), which happens after we detach the context from the thread.
   1411     Context origCtx = ctx.attach();
   1412     assertEquals("testValue", testKey.get());
   1413     ClientCall<String, Integer> call = channel.newCall(method, callOptions);
   1414     ctx.detach(origCtx);
   1415     assertNull(testKey.get());
   1416     call.start(mockCallListener, new Metadata());
   1417 
   1418     // Simulate name resolution results
   1419     EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress);
   1420     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
   1421     subchannel.requestConnection();
   1422     verify(mockTransportFactory)
   1423         .newClientTransport(same(socketAddress), eq(clientTransportOptions));
   1424     MockClientTransportInfo transportInfo = transports.poll();
   1425     final ConnectionClientTransport transport = transportInfo.transport;
   1426     when(transport.getAttributes()).thenReturn(Attributes.EMPTY);
   1427     doAnswer(new Answer<ClientStream>() {
   1428         @Override
   1429         public ClientStream answer(InvocationOnMock in) throws Throwable {
   1430           newStreamContexts.add(Context.current());
   1431           return mock(ClientStream.class);
   1432         }
   1433       }).when(transport).newStream(
   1434           any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
   1435 
   1436     verify(creds, never()).applyRequestMetadata(
   1437         any(MethodDescriptor.class), any(Attributes.class), any(Executor.class),
   1438         any(MetadataApplier.class));
   1439 
   1440     // applyRequestMetadata() is called after the transport becomes ready.
   1441     transportInfo.listener.transportReady();
   1442     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
   1443         .thenReturn(PickResult.withSubchannel(subchannel));
   1444     helper.updateBalancingState(READY, mockPicker);
   1445     executor.runDueTasks();
   1446     ArgumentCaptor<Attributes> attrsCaptor = ArgumentCaptor.forClass(Attributes.class);
   1447     ArgumentCaptor<MetadataApplier> applierCaptor = ArgumentCaptor.forClass(MetadataApplier.class);
   1448     verify(creds).applyRequestMetadata(same(method), attrsCaptor.capture(),
   1449         same(executor.getScheduledExecutorService()), applierCaptor.capture());
   1450     assertEquals("testValue", testKey.get(credsApplyContexts.poll()));
   1451     assertEquals(AUTHORITY, attrsCaptor.getValue().get(CallCredentials.ATTR_AUTHORITY));
   1452     assertEquals(SecurityLevel.NONE,
   1453         attrsCaptor.getValue().get(CallCredentials.ATTR_SECURITY_LEVEL));
   1454     verify(transport, never()).newStream(
   1455         any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
   1456 
   1457     // newStream() is called after apply() is called
   1458     applierCaptor.getValue().apply(new Metadata());
   1459     verify(transport).newStream(same(method), any(Metadata.class), same(callOptions));
   1460     assertEquals("testValue", testKey.get(newStreamContexts.poll()));
   1461     // The context should not live beyond the scope of newStream() and applyRequestMetadata()
   1462     assertNull(testKey.get());
   1463 
   1464 
   1465     // Second call will not be on delayed transport
   1466     origCtx = ctx.attach();
   1467     call = channel.newCall(method, callOptions);
   1468     ctx.detach(origCtx);
   1469     call.start(mockCallListener, new Metadata());
   1470 
   1471     verify(creds, times(2)).applyRequestMetadata(same(method), attrsCaptor.capture(),
   1472         same(executor.getScheduledExecutorService()), applierCaptor.capture());
   1473     assertEquals("testValue", testKey.get(credsApplyContexts.poll()));
   1474     assertEquals(AUTHORITY, attrsCaptor.getValue().get(CallCredentials.ATTR_AUTHORITY));
   1475     assertEquals(SecurityLevel.NONE,
   1476         attrsCaptor.getValue().get(CallCredentials.ATTR_SECURITY_LEVEL));
   1477     // This is from the first call
   1478     verify(transport).newStream(
   1479         any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
   1480 
   1481     // Still, newStream() is called after apply() is called
   1482     applierCaptor.getValue().apply(new Metadata());
   1483     verify(transport, times(2)).newStream(same(method), any(Metadata.class), same(callOptions));
   1484     assertEquals("testValue", testKey.get(newStreamContexts.poll()));
   1485 
   1486     assertNull(testKey.get());
   1487   }
   1488 
   1489   @Test
   1490   public void pickerReturnsStreamTracer_noDelay() {
   1491     ClientStream mockStream = mock(ClientStream.class);
   1492     ClientStreamTracer.Factory factory1 = mock(ClientStreamTracer.Factory.class);
   1493     ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class);
   1494     createChannel();
   1495     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
   1496     subchannel.requestConnection();
   1497     MockClientTransportInfo transportInfo = transports.poll();
   1498     transportInfo.listener.transportReady();
   1499     ClientTransport mockTransport = transportInfo.transport;
   1500     when(mockTransport.newStream(
   1501             any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
   1502         .thenReturn(mockStream);
   1503 
   1504     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
   1505         PickResult.withSubchannel(subchannel, factory2));
   1506     helper.updateBalancingState(READY, mockPicker);
   1507 
   1508     CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1);
   1509     ClientCall<String, Integer> call = channel.newCall(method, callOptions);
   1510     call.start(mockCallListener, new Metadata());
   1511 
   1512     verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class));
   1513     verify(mockTransport).newStream(same(method), any(Metadata.class), callOptionsCaptor.capture());
   1514     assertEquals(
   1515         Arrays.asList(factory1, factory2),
   1516         callOptionsCaptor.getValue().getStreamTracerFactories());
   1517     // The factories are safely not stubbed because we do not expect any usage of them.
   1518     verifyZeroInteractions(factory1);
   1519     verifyZeroInteractions(factory2);
   1520   }
   1521 
   1522   @Test
   1523   public void pickerReturnsStreamTracer_delayed() {
   1524     ClientStream mockStream = mock(ClientStream.class);
   1525     ClientStreamTracer.Factory factory1 = mock(ClientStreamTracer.Factory.class);
   1526     ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class);
   1527     createChannel();
   1528 
   1529     CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1);
   1530     ClientCall<String, Integer> call = channel.newCall(method, callOptions);
   1531     call.start(mockCallListener, new Metadata());
   1532 
   1533     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
   1534     subchannel.requestConnection();
   1535     MockClientTransportInfo transportInfo = transports.poll();
   1536     transportInfo.listener.transportReady();
   1537     ClientTransport mockTransport = transportInfo.transport;
   1538     when(mockTransport.newStream(
   1539             any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
   1540         .thenReturn(mockStream);
   1541     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
   1542         PickResult.withSubchannel(subchannel, factory2));
   1543 
   1544     helper.updateBalancingState(READY, mockPicker);
   1545     assertEquals(1, executor.runDueTasks());
   1546 
   1547     verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class));
   1548     verify(mockTransport).newStream(same(method), any(Metadata.class), callOptionsCaptor.capture());
   1549     assertEquals(
   1550         Arrays.asList(factory1, factory2),
   1551         callOptionsCaptor.getValue().getStreamTracerFactories());
   1552     // The factories are safely not stubbed because we do not expect any usage of them.
   1553     verifyZeroInteractions(factory1);
   1554     verifyZeroInteractions(factory2);
   1555   }
   1556 
   1557   @Test
   1558   public void getState_loadBalancerSupportsChannelState() {
   1559     channelBuilder.nameResolverFactory(
   1560         new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build());
   1561     createChannel();
   1562     assertEquals(IDLE, channel.getState(false));
   1563 
   1564     helper.updateBalancingState(TRANSIENT_FAILURE, mockPicker);
   1565     assertEquals(TRANSIENT_FAILURE, channel.getState(false));
   1566   }
   1567 
   1568   @Test
   1569   public void getState_withRequestConnect() {
   1570     channelBuilder.nameResolverFactory(
   1571         new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build());
   1572     requestConnection = false;
   1573     createChannel();
   1574 
   1575     assertEquals(IDLE, channel.getState(false));
   1576     verify(mockLoadBalancerFactory, never()).newLoadBalancer(any(Helper.class));
   1577 
   1578     // call getState() with requestConnection = true
   1579     assertEquals(IDLE, channel.getState(true));
   1580     ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
   1581     verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture());
   1582     helper = helperCaptor.getValue();
   1583 
   1584     helper.updateBalancingState(CONNECTING, mockPicker);
   1585     assertEquals(CONNECTING, channel.getState(false));
   1586     assertEquals(CONNECTING, channel.getState(true));
   1587     verifyNoMoreInteractions(mockLoadBalancerFactory);
   1588   }
   1589 
   1590   @Test
   1591   public void getState_withRequestConnect_IdleWithLbRunning() {
   1592     channelBuilder.nameResolverFactory(
   1593         new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build());
   1594     createChannel();
   1595     verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
   1596 
   1597     helper.updateBalancingState(IDLE, mockPicker);
   1598 
   1599     assertEquals(IDLE, channel.getState(true));
   1600     verifyNoMoreInteractions(mockLoadBalancerFactory);
   1601     verify(mockPicker).requestConnection();
   1602   }
   1603 
   1604   @Test
   1605   public void notifyWhenStateChanged() {
   1606     final AtomicBoolean stateChanged = new AtomicBoolean();
   1607     Runnable onStateChanged = new Runnable() {
   1608       @Override
   1609       public void run() {
   1610         stateChanged.set(true);
   1611       }
   1612     };
   1613 
   1614     channelBuilder.nameResolverFactory(
   1615         new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build());
   1616     createChannel();
   1617     assertEquals(IDLE, channel.getState(false));
   1618 
   1619     channel.notifyWhenStateChanged(IDLE, onStateChanged);
   1620     executor.runDueTasks();
   1621     assertFalse(stateChanged.get());
   1622 
   1623     // state change from IDLE to CONNECTING
   1624     helper.updateBalancingState(CONNECTING, mockPicker);
   1625     // onStateChanged callback should run
   1626     executor.runDueTasks();
   1627     assertTrue(stateChanged.get());
   1628 
   1629     // clear and test form CONNECTING
   1630     stateChanged.set(false);
   1631     channel.notifyWhenStateChanged(IDLE, onStateChanged);
   1632     // onStateChanged callback should run immediately
   1633     executor.runDueTasks();
   1634     assertTrue(stateChanged.get());
   1635   }
   1636 
   1637   @Test
   1638   public void channelStateWhenChannelShutdown() {
   1639     final AtomicBoolean stateChanged = new AtomicBoolean();
   1640     Runnable onStateChanged = new Runnable() {
   1641       @Override
   1642       public void run() {
   1643         stateChanged.set(true);
   1644       }
   1645     };
   1646 
   1647     channelBuilder.nameResolverFactory(
   1648         new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build());
   1649     createChannel();
   1650     assertEquals(IDLE, channel.getState(false));
   1651     channel.notifyWhenStateChanged(IDLE, onStateChanged);
   1652     executor.runDueTasks();
   1653     assertFalse(stateChanged.get());
   1654 
   1655     channel.shutdown();
   1656     assertEquals(SHUTDOWN, channel.getState(false));
   1657     executor.runDueTasks();
   1658     assertTrue(stateChanged.get());
   1659 
   1660     stateChanged.set(false);
   1661     channel.notifyWhenStateChanged(SHUTDOWN, onStateChanged);
   1662     helper.updateBalancingState(CONNECTING, mockPicker);
   1663 
   1664     assertEquals(SHUTDOWN, channel.getState(false));
   1665     executor.runDueTasks();
   1666     assertFalse(stateChanged.get());
   1667   }
   1668 
   1669   @Test
   1670   public void stateIsIdleOnIdleTimeout() {
   1671     long idleTimeoutMillis = 2000L;
   1672     channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS);
   1673     createChannel();
   1674     assertEquals(IDLE, channel.getState(false));
   1675 
   1676     helper.updateBalancingState(CONNECTING, mockPicker);
   1677     assertEquals(CONNECTING, channel.getState(false));
   1678 
   1679     timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
   1680     assertEquals(IDLE, channel.getState(false));
   1681   }
   1682 
   1683   @Test
   1684   public void panic_whenIdle() {
   1685     subtestPanic(IDLE);
   1686   }
   1687 
   1688   @Test
   1689   public void panic_whenConnecting() {
   1690     subtestPanic(CONNECTING);
   1691   }
   1692 
   1693   @Test
   1694   public void panic_whenTransientFailure() {
   1695     subtestPanic(TRANSIENT_FAILURE);
   1696   }
   1697 
   1698   @Test
   1699   public void panic_whenReady() {
   1700     subtestPanic(READY);
   1701   }
   1702 
   1703   private void subtestPanic(ConnectivityState initialState) {
   1704     assertNotEquals("We don't test panic mode if it's already SHUTDOWN", SHUTDOWN, initialState);
   1705     long idleTimeoutMillis = 2000L;
   1706     FakeNameResolverFactory nameResolverFactory =
   1707         new FakeNameResolverFactory.Builder(expectedUri).build();
   1708     channelBuilder.nameResolverFactory(nameResolverFactory);
   1709     channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS);
   1710     createChannel();
   1711 
   1712     verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
   1713     assertEquals(1, nameResolverFactory.resolvers.size());
   1714     FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.remove(0);
   1715 
   1716     Throwable panicReason = new Exception("Simulated uncaught exception");
   1717     if (initialState == IDLE) {
   1718       timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
   1719     } else {
   1720       helper.updateBalancingState(initialState, mockPicker);
   1721     }
   1722     assertEquals(initialState, channel.getState(false));
   1723 
   1724     if (initialState == IDLE) {
   1725       // IDLE mode will shutdown resolver and balancer
   1726       verify(mockLoadBalancer).shutdown();
   1727       assertTrue(resolver.shutdown);
   1728       // A new resolver is created
   1729       assertEquals(1, nameResolverFactory.resolvers.size());
   1730       resolver = nameResolverFactory.resolvers.remove(0);
   1731       assertFalse(resolver.shutdown);
   1732     } else {
   1733       verify(mockLoadBalancer, never()).shutdown();
   1734       assertFalse(resolver.shutdown);
   1735     }
   1736 
   1737     // Make channel panic!
   1738     channel.panic(panicReason);
   1739 
   1740     // Calls buffered in delayedTransport will fail
   1741 
   1742     // Resolver and balancer are shutdown
   1743     verify(mockLoadBalancer).shutdown();
   1744     assertTrue(resolver.shutdown);
   1745 
   1746     // Channel will stay in TRANSIENT_FAILURE. getState(true) will not revive it.
   1747     assertEquals(TRANSIENT_FAILURE, channel.getState(true));
   1748     assertEquals(TRANSIENT_FAILURE, channel.getState(true));
   1749     verifyPanicMode(panicReason);
   1750 
   1751     // No new resolver or balancer are created
   1752     verifyNoMoreInteractions(mockLoadBalancerFactory);
   1753     assertEquals(0, nameResolverFactory.resolvers.size());
   1754 
   1755     // A misbehaving balancer that calls updateBalancingState() after it's shut down will not be
   1756     // able to revive it.
   1757     helper.updateBalancingState(READY, mockPicker);
   1758     verifyPanicMode(panicReason);
   1759 
   1760     // Cannot be revived by exitIdleMode()
   1761     channel.exitIdleMode();
   1762     verifyPanicMode(panicReason);
   1763 
   1764     // Can still shutdown normally
   1765     channel.shutdown();
   1766     assertTrue(channel.isShutdown());
   1767     assertTrue(channel.isTerminated());
   1768     assertEquals(SHUTDOWN, channel.getState(false));
   1769 
   1770     // We didn't stub mockPicker, because it should have never been called in this test.
   1771     verifyZeroInteractions(mockPicker);
   1772   }
   1773 
   1774   @Test
   1775   public void panic_bufferedCallsWillFail() {
   1776     createChannel();
   1777 
   1778     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
   1779         .thenReturn(PickResult.withNoResult());
   1780     helper.updateBalancingState(CONNECTING, mockPicker);
   1781 
   1782     // Start RPCs that will be buffered in delayedTransport
   1783     ClientCall<String, Integer> call =
   1784         channel.newCall(method, CallOptions.DEFAULT.withoutWaitForReady());
   1785     call.start(mockCallListener, new Metadata());
   1786 
   1787     ClientCall<String, Integer> call2 =
   1788         channel.newCall(method, CallOptions.DEFAULT.withWaitForReady());
   1789     call2.start(mockCallListener2, new Metadata());
   1790 
   1791     executor.runDueTasks();
   1792     verifyZeroInteractions(mockCallListener, mockCallListener2);
   1793 
   1794     // Enter panic
   1795     Throwable panicReason = new Exception("Simulated uncaught exception");
   1796     channel.panic(panicReason);
   1797 
   1798     // Buffered RPCs fail immediately
   1799     executor.runDueTasks();
   1800     verifyCallListenerClosed(mockCallListener, Status.Code.INTERNAL, panicReason);
   1801     verifyCallListenerClosed(mockCallListener2, Status.Code.INTERNAL, panicReason);
   1802   }
   1803 
   1804   private void verifyPanicMode(Throwable cause) {
   1805     Assume.assumeTrue("Panic mode disabled to resolve issues with some tests. See #3293", false);
   1806 
   1807     @SuppressWarnings("unchecked")
   1808     ClientCall.Listener<Integer> mockListener =
   1809         (ClientCall.Listener<Integer>) mock(ClientCall.Listener.class);
   1810     assertEquals(TRANSIENT_FAILURE, channel.getState(false));
   1811     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
   1812     call.start(mockListener, new Metadata());
   1813     executor.runDueTasks();
   1814     verifyCallListenerClosed(mockListener, Status.Code.INTERNAL, cause);
   1815 
   1816     // Channel is dead.  No more pending task to possibly revive it.
   1817     assertEquals(0, timer.numPendingTasks());
   1818     assertEquals(0, executor.numPendingTasks());
   1819     assertEquals(0, oobExecutor.numPendingTasks());
   1820   }
   1821 
   1822   private void verifyCallListenerClosed(
   1823       ClientCall.Listener<Integer> listener, Status.Code code, Throwable cause) {
   1824     ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(null);
   1825     verify(listener).onClose(captor.capture(), any(Metadata.class));
   1826     Status rpcStatus = captor.getValue();
   1827     assertEquals(code, rpcStatus.getCode());
   1828     assertSame(cause, rpcStatus.getCause());
   1829     verifyNoMoreInteractions(listener);
   1830   }
   1831 
   1832   @Test
   1833   public void idleTimeoutAndReconnect() {
   1834     long idleTimeoutMillis = 2000L;
   1835     channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS);
   1836     createChannel();
   1837 
   1838     timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
   1839     assertEquals(IDLE, channel.getState(true /* request connection */));
   1840 
   1841     ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
   1842     // Two times of requesting connection will create loadBalancer twice.
   1843     verify(mockLoadBalancerFactory, times(2)).newLoadBalancer(helperCaptor.capture());
   1844     Helper helper2 = helperCaptor.getValue();
   1845 
   1846     // Updating on the old helper (whose balancer has been shutdown) does not change the channel
   1847     // state.
   1848     helper.updateBalancingState(CONNECTING, mockPicker);
   1849     assertEquals(IDLE, channel.getState(false));
   1850 
   1851     helper2.updateBalancingState(CONNECTING, mockPicker);
   1852     assertEquals(CONNECTING, channel.getState(false));
   1853   }
   1854 
   1855   @Test
   1856   public void idleMode_resetsDelayedTransportPicker() {
   1857     ClientStream mockStream = mock(ClientStream.class);
   1858     Status pickError = Status.UNAVAILABLE.withDescription("pick result error");
   1859     long idleTimeoutMillis = 1000L;
   1860     channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS);
   1861     channelBuilder.nameResolverFactory(
   1862         new FakeNameResolverFactory.Builder(expectedUri)
   1863             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
   1864             .build());
   1865     createChannel();
   1866     assertEquals(IDLE, channel.getState(false));
   1867 
   1868     // This call will be buffered in delayedTransport
   1869     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
   1870     call.start(mockCallListener, new Metadata());
   1871 
   1872     // Move channel into TRANSIENT_FAILURE, which will fail the pending call
   1873     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
   1874         .thenReturn(PickResult.withError(pickError));
   1875     helper.updateBalancingState(TRANSIENT_FAILURE, mockPicker);
   1876     assertEquals(TRANSIENT_FAILURE, channel.getState(false));
   1877     executor.runDueTasks();
   1878     verify(mockCallListener).onClose(same(pickError), any(Metadata.class));
   1879 
   1880     // Move channel to idle
   1881     timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
   1882     assertEquals(IDLE, channel.getState(false));
   1883 
   1884     // This call should be buffered, but will move the channel out of idle
   1885     ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT);
   1886     call2.start(mockCallListener2, new Metadata());
   1887     executor.runDueTasks();
   1888     verifyNoMoreInteractions(mockCallListener2);
   1889 
   1890     // Get the helper created on exiting idle
   1891     ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
   1892     verify(mockLoadBalancerFactory, times(2)).newLoadBalancer(helperCaptor.capture());
   1893     Helper helper2 = helperCaptor.getValue();
   1894 
   1895     // Establish a connection
   1896     Subchannel subchannel = helper2.createSubchannel(addressGroup, Attributes.EMPTY);
   1897     subchannel.requestConnection();
   1898     MockClientTransportInfo transportInfo = transports.poll();
   1899     ConnectionClientTransport mockTransport = transportInfo.transport;
   1900     ManagedClientTransport.Listener transportListener = transportInfo.listener;
   1901     when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class)))
   1902         .thenReturn(mockStream);
   1903     transportListener.transportReady();
   1904 
   1905     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
   1906         .thenReturn(PickResult.withSubchannel(subchannel));
   1907     helper2.updateBalancingState(READY, mockPicker);
   1908     assertEquals(READY, channel.getState(false));
   1909     executor.runDueTasks();
   1910 
   1911     // Verify the buffered call was drained
   1912     verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class));
   1913     verify(mockStream).start(any(ClientStreamListener.class));
   1914   }
   1915 
   1916   @Test
   1917   public void enterIdleEntersIdle() {
   1918     createChannel();
   1919     helper.updateBalancingState(READY, mockPicker);
   1920     assertEquals(READY, channel.getState(false));
   1921 
   1922     channel.enterIdle();
   1923 
   1924     assertEquals(IDLE, channel.getState(false));
   1925   }
   1926 
   1927   @Test
   1928   public void enterIdleAfterIdleTimerIsNoOp() {
   1929     long idleTimeoutMillis = 2000L;
   1930     channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS);
   1931     createChannel();
   1932     timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
   1933     assertEquals(IDLE, channel.getState(false));
   1934 
   1935     channel.enterIdle();
   1936 
   1937     assertEquals(IDLE, channel.getState(false));
   1938   }
   1939 
   1940   @Test
   1941   public void enterIdle_exitsIdleIfDelayedStreamPending() {
   1942     FakeNameResolverFactory nameResolverFactory =
   1943         new FakeNameResolverFactory.Builder(expectedUri)
   1944             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
   1945             .build();
   1946     channelBuilder.nameResolverFactory(nameResolverFactory);
   1947     createChannel();
   1948 
   1949     // Start a call that will be buffered in delayedTransport
   1950     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
   1951     call.start(mockCallListener, new Metadata());
   1952 
   1953     // enterIdle() will shut down the name resolver and lb policy used to get a pick for the delayed
   1954     // call
   1955     channel.enterIdle();
   1956     assertEquals(IDLE, channel.getState(false));
   1957 
   1958     // enterIdle() will restart the delayed call by exiting idle. This creates a new helper.
   1959     ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
   1960     verify(mockLoadBalancerFactory, times(2)).newLoadBalancer(helperCaptor.capture());
   1961     Helper helper2 = helperCaptor.getValue();
   1962 
   1963     // Establish a connection
   1964     Subchannel subchannel = helper2.createSubchannel(addressGroup, Attributes.EMPTY);
   1965     subchannel.requestConnection();
   1966     ClientStream mockStream = mock(ClientStream.class);
   1967     MockClientTransportInfo transportInfo = transports.poll();
   1968     ConnectionClientTransport mockTransport = transportInfo.transport;
   1969     ManagedClientTransport.Listener transportListener = transportInfo.listener;
   1970     when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class)))
   1971         .thenReturn(mockStream);
   1972     transportListener.transportReady();
   1973     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
   1974         .thenReturn(PickResult.withSubchannel(subchannel));
   1975     helper2.updateBalancingState(READY, mockPicker);
   1976     assertEquals(READY, channel.getState(false));
   1977 
   1978     // Verify the original call was drained
   1979     executor.runDueTasks();
   1980     verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class));
   1981     verify(mockStream).start(any(ClientStreamListener.class));
   1982   }
   1983 
   1984   @Test
   1985   public void updateBalancingStateDoesUpdatePicker() {
   1986     ClientStream mockStream = mock(ClientStream.class);
   1987     createChannel();
   1988 
   1989     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
   1990     call.start(mockCallListener, new Metadata());
   1991 
   1992     // Make the transport available with subchannel2
   1993     Subchannel subchannel1 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
   1994     Subchannel subchannel2 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
   1995     subchannel2.requestConnection();
   1996 
   1997     MockClientTransportInfo transportInfo = transports.poll();
   1998     ConnectionClientTransport mockTransport = transportInfo.transport;
   1999     ManagedClientTransport.Listener transportListener = transportInfo.listener;
   2000     when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class)))
   2001         .thenReturn(mockStream);
   2002     transportListener.transportReady();
   2003 
   2004     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
   2005         .thenReturn(PickResult.withSubchannel(subchannel1));
   2006     helper.updateBalancingState(READY, mockPicker);
   2007 
   2008     executor.runDueTasks();
   2009     verify(mockTransport, never())
   2010         .newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
   2011     verify(mockStream, never()).start(any(ClientStreamListener.class));
   2012 
   2013 
   2014     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
   2015         .thenReturn(PickResult.withSubchannel(subchannel2));
   2016     helper.updateBalancingState(READY, mockPicker);
   2017 
   2018     executor.runDueTasks();
   2019     verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class));
   2020     verify(mockStream).start(any(ClientStreamListener.class));
   2021   }
   2022 
   2023   @Test
   2024   public void updateBalancingStateWithShutdownShouldBeIgnored() {
   2025     channelBuilder.nameResolverFactory(
   2026         new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build());
   2027     createChannel();
   2028     assertEquals(IDLE, channel.getState(false));
   2029 
   2030     Runnable onStateChanged = mock(Runnable.class);
   2031     channel.notifyWhenStateChanged(IDLE, onStateChanged);
   2032 
   2033     helper.updateBalancingState(SHUTDOWN, mockPicker);
   2034 
   2035     assertEquals(IDLE, channel.getState(false));
   2036     executor.runDueTasks();
   2037     verify(onStateChanged, never()).run();
   2038   }
   2039 
   2040   @Test
   2041   public void resetConnectBackoff() {
   2042     // Start with a name resolution failure to trigger backoff attempts
   2043     Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
   2044     FakeNameResolverFactory nameResolverFactory =
   2045         new FakeNameResolverFactory.Builder(expectedUri).setError(error).build();
   2046     channelBuilder.nameResolverFactory(nameResolverFactory);
   2047     // Name resolution is started as soon as channel is created.
   2048     createChannel();
   2049     FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0);
   2050     verify(mockLoadBalancer).handleNameResolutionError(same(error));
   2051 
   2052     FakeClock.ScheduledTask nameResolverBackoff = getNameResolverRefresh();
   2053     assertNotNull("There should be a name resolver backoff task", nameResolverBackoff);
   2054     assertEquals(0, resolver.refreshCalled);
   2055 
   2056     // Verify resetConnectBackoff() calls refresh and cancels the scheduled backoff
   2057     channel.resetConnectBackoff();
   2058     assertEquals(1, resolver.refreshCalled);
   2059     assertTrue(nameResolverBackoff.isCancelled());
   2060 
   2061     // Simulate a race between cancel and the task scheduler. Should be a no-op.
   2062     nameResolverBackoff.command.run();
   2063     assertEquals(1, resolver.refreshCalled);
   2064 
   2065     // Verify that the reconnect policy was recreated and the backoff multiplier reset to 1
   2066     timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS);
   2067     assertEquals(2, resolver.refreshCalled);
   2068   }
   2069 
   2070   @Test
   2071   public void resetConnectBackoff_noOpWithoutPendingResolverBackoff() {
   2072     FakeNameResolverFactory nameResolverFactory =
   2073         new FakeNameResolverFactory.Builder(expectedUri)
   2074             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
   2075             .build();
   2076     channelBuilder.nameResolverFactory(nameResolverFactory);
   2077     createChannel();
   2078     FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0);
   2079     assertEquals(0, nameResolver.refreshCalled);
   2080 
   2081     channel.resetConnectBackoff();
   2082 
   2083     assertEquals(0, nameResolver.refreshCalled);
   2084   }
   2085 
   2086   @Test
   2087   public void resetConnectBackoff_noOpWhenChannelShutdown() {
   2088     FakeNameResolverFactory nameResolverFactory =
   2089         new FakeNameResolverFactory.Builder(expectedUri).build();
   2090     channelBuilder.nameResolverFactory(nameResolverFactory);
   2091     createChannel();
   2092 
   2093     channel.shutdown();
   2094     assertTrue(channel.isShutdown());
   2095     channel.resetConnectBackoff();
   2096 
   2097     FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0);
   2098     assertEquals(0, nameResolver.refreshCalled);
   2099   }
   2100 
   2101   @Test
   2102   public void resetConnectBackoff_noOpWhenNameResolverNotStarted() {
   2103     FakeNameResolverFactory nameResolverFactory =
   2104         new FakeNameResolverFactory.Builder(expectedUri).build();
   2105     channelBuilder.nameResolverFactory(nameResolverFactory);
   2106     requestConnection = false;
   2107     createChannel();
   2108 
   2109     channel.resetConnectBackoff();
   2110 
   2111     FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0);
   2112     assertEquals(0, nameResolver.refreshCalled);
   2113   }
   2114 
   2115   @Test
   2116   public void channelsAndSubchannels_instrumented_name() throws Exception {
   2117     createChannel();
   2118     assertEquals(TARGET, getStats(channel).target);
   2119 
   2120     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
   2121     assertEquals(Collections.singletonList(addressGroup).toString(),
   2122         getStats((AbstractSubchannel) subchannel).target);
   2123   }
   2124 
   2125   @Test
   2126   public void channelTracing_channelCreationEvent() throws Exception {
   2127     timer.forwardNanos(1234);
   2128     channelBuilder.maxTraceEvents(10);
   2129     createChannel();
   2130     assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
   2131         .setDescription("Channel created")
   2132         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
   2133         .setTimestampNanos(timer.getTicker().read())
   2134         .build());
   2135   }
   2136 
   2137   @Test
   2138   public void channelTracing_subchannelCreationEvents() throws Exception {
   2139     channelBuilder.maxTraceEvents(10);
   2140     createChannel();
   2141     timer.forwardNanos(1234);
   2142     AbstractSubchannel subchannel =
   2143         (AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY);
   2144     assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
   2145         .setDescription("Child channel created")
   2146         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
   2147         .setTimestampNanos(timer.getTicker().read())
   2148         .setSubchannelRef(subchannel.getInternalSubchannel())
   2149         .build());
   2150     assertThat(getStats(subchannel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
   2151         .setDescription("Subchannel created")
   2152         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
   2153         .setTimestampNanos(timer.getTicker().read())
   2154         .build());
   2155   }
   2156 
   2157   @Test
   2158   public void channelTracing_nameResolvingErrorEvent() throws Exception {
   2159     timer.forwardNanos(1234);
   2160     channelBuilder.maxTraceEvents(10);
   2161     createChannel();
   2162     assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
   2163         .setDescription("Failed to resolve name")
   2164         .setSeverity(ChannelTrace.Event.Severity.CT_WARNING)
   2165         .setTimestampNanos(timer.getTicker().read())
   2166         .build());
   2167   }
   2168 
   2169   @Test
   2170   public void channelTracing_nameResolvedEvent() throws Exception {
   2171     timer.forwardNanos(1234);
   2172     channelBuilder.maxTraceEvents(10);
   2173     FakeNameResolverFactory nameResolverFactory =
   2174         new FakeNameResolverFactory.Builder(expectedUri)
   2175             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
   2176             .build();
   2177     channelBuilder.nameResolverFactory(nameResolverFactory);
   2178     createChannel();
   2179     assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
   2180         .setDescription("Address resolved: "
   2181             + Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
   2182         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
   2183         .setTimestampNanos(timer.getTicker().read())
   2184         .build());
   2185   }
   2186 
   2187   @Test
   2188   public void channelTracing_nameResolvedEvent_zeorAndNonzeroBackends() throws Exception {
   2189     timer.forwardNanos(1234);
   2190     channelBuilder.maxTraceEvents(10);
   2191     List<EquivalentAddressGroup> servers = new ArrayList<>();
   2192     servers.add(new EquivalentAddressGroup(socketAddress));
   2193     FakeNameResolverFactory nameResolverFactory =
   2194         new FakeNameResolverFactory.Builder(expectedUri).setServers(servers).build();
   2195     channelBuilder.nameResolverFactory(nameResolverFactory);
   2196     createChannel();
   2197 
   2198     int prevSize = getStats(channel).channelTrace.events.size();
   2199     nameResolverFactory.resolvers.get(0).listener.onAddresses(
   2200         Collections.singletonList(new EquivalentAddressGroup(
   2201             Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))),
   2202         Attributes.EMPTY);
   2203     assertThat(getStats(channel).channelTrace.events).hasSize(prevSize);
   2204 
   2205     prevSize = getStats(channel).channelTrace.events.size();
   2206     nameResolverFactory.resolvers.get(0).listener.onError(Status.INTERNAL);
   2207     assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
   2208 
   2209     prevSize = getStats(channel).channelTrace.events.size();
   2210     nameResolverFactory.resolvers.get(0).listener.onError(Status.INTERNAL);
   2211     assertThat(getStats(channel).channelTrace.events).hasSize(prevSize);
   2212 
   2213     prevSize = getStats(channel).channelTrace.events.size();
   2214     nameResolverFactory.resolvers.get(0).listener.onAddresses(
   2215         Collections.singletonList(new EquivalentAddressGroup(
   2216             Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))),
   2217         Attributes.EMPTY);
   2218     assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
   2219   }
   2220 
   2221   @Test
   2222   public void channelTracing_serviceConfigChange() throws Exception {
   2223     timer.forwardNanos(1234);
   2224     channelBuilder.maxTraceEvents(10);
   2225     List<EquivalentAddressGroup> servers = new ArrayList<>();
   2226     servers.add(new EquivalentAddressGroup(socketAddress));
   2227     FakeNameResolverFactory nameResolverFactory =
   2228         new FakeNameResolverFactory.Builder(expectedUri).setServers(servers).build();
   2229     channelBuilder.nameResolverFactory(nameResolverFactory);
   2230     createChannel();
   2231 
   2232     int prevSize = getStats(channel).channelTrace.events.size();
   2233     Attributes attributes =
   2234         Attributes.newBuilder()
   2235             .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, new HashMap<String, Object>())
   2236             .build();
   2237     nameResolverFactory.resolvers.get(0).listener.onAddresses(
   2238         Collections.singletonList(new EquivalentAddressGroup(
   2239             Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))),
   2240         attributes);
   2241     assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
   2242     assertThat(getStats(channel).channelTrace.events.get(prevSize))
   2243         .isEqualTo(new ChannelTrace.Event.Builder()
   2244             .setDescription("Service config changed")
   2245             .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
   2246             .setTimestampNanos(timer.getTicker().read())
   2247             .build());
   2248 
   2249     prevSize = getStats(channel).channelTrace.events.size();
   2250     nameResolverFactory.resolvers.get(0).listener.onAddresses(
   2251         Collections.singletonList(new EquivalentAddressGroup(
   2252             Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))),
   2253         attributes);
   2254     assertThat(getStats(channel).channelTrace.events).hasSize(prevSize);
   2255 
   2256     prevSize = getStats(channel).channelTrace.events.size();
   2257     Map<String, Object> serviceConfig = new HashMap<String, Object>();
   2258     serviceConfig.put("methodConfig", new HashMap<String, Object>());
   2259     attributes =
   2260         Attributes.newBuilder()
   2261             .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig)
   2262             .build();
   2263     timer.forwardNanos(1234);
   2264     nameResolverFactory.resolvers.get(0).listener.onAddresses(
   2265         Collections.singletonList(new EquivalentAddressGroup(
   2266             Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))),
   2267         attributes);
   2268     assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
   2269     assertThat(getStats(channel).channelTrace.events.get(prevSize))
   2270         .isEqualTo(new ChannelTrace.Event.Builder()
   2271             .setDescription("Service config changed")
   2272             .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
   2273             .setTimestampNanos(timer.getTicker().read())
   2274             .build());
   2275   }
   2276 
   2277   @Test
   2278   public void channelTracing_stateChangeEvent() throws Exception {
   2279     channelBuilder.maxTraceEvents(10);
   2280     createChannel();
   2281     timer.forwardNanos(1234);
   2282     helper.updateBalancingState(CONNECTING, mockPicker);
   2283     assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
   2284         .setDescription("Entering CONNECTING state")
   2285         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
   2286         .setTimestampNanos(timer.getTicker().read())
   2287         .build());
   2288   }
   2289 
   2290   @Test
   2291   public void channelTracing_subchannelStateChangeEvent() throws Exception {
   2292     channelBuilder.maxTraceEvents(10);
   2293     createChannel();
   2294     AbstractSubchannel subchannel =
   2295         (AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY);
   2296     timer.forwardNanos(1234);
   2297     subchannel.obtainActiveTransport();
   2298     assertThat(getStats(subchannel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
   2299         .setDescription("Entering CONNECTING state")
   2300         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
   2301         .setTimestampNanos(timer.getTicker().read())
   2302         .build());
   2303   }
   2304 
   2305   @Test
   2306   public void channelTracing_oobChannelStateChangeEvent() throws Exception {
   2307     channelBuilder.maxTraceEvents(10);
   2308     createChannel();
   2309     OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "authority");
   2310     timer.forwardNanos(1234);
   2311     oobChannel.handleSubchannelStateChange(
   2312         ConnectivityStateInfo.forNonError(ConnectivityState.CONNECTING));
   2313     assertThat(getStats(oobChannel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
   2314         .setDescription("Entering CONNECTING state")
   2315         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
   2316         .setTimestampNanos(timer.getTicker().read())
   2317         .build());
   2318   }
   2319 
   2320   @Test
   2321   public void channelTracing_oobChannelCreationEvents() throws Exception {
   2322     channelBuilder.maxTraceEvents(10);
   2323     createChannel();
   2324     timer.forwardNanos(1234);
   2325     OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "authority");
   2326     assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
   2327         .setDescription("Child channel created")
   2328         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
   2329         .setTimestampNanos(timer.getTicker().read())
   2330         .setChannelRef(oobChannel)
   2331         .build());
   2332     assertThat(getStats(oobChannel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
   2333         .setDescription("OobChannel created")
   2334         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
   2335         .setTimestampNanos(timer.getTicker().read())
   2336         .build());
   2337     assertThat(getStats(oobChannel.getInternalSubchannel()).channelTrace.events).contains(
   2338         new ChannelTrace.Event.Builder()
   2339             .setDescription("Subchannel created")
   2340             .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
   2341             .setTimestampNanos(timer.getTicker().read())
   2342             .build());
   2343   }
   2344 
   2345   @Test
   2346   public void channelsAndSubchannels_instrumented_state() throws Exception {
   2347     createChannel();
   2348 
   2349     ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
   2350     verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture());
   2351     helper = helperCaptor.getValue();
   2352 
   2353     assertEquals(IDLE, getStats(channel).state);
   2354     helper.updateBalancingState(CONNECTING, mockPicker);
   2355     assertEquals(CONNECTING, getStats(channel).state);
   2356 
   2357     AbstractSubchannel subchannel =
   2358         (AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY);
   2359 
   2360     assertEquals(IDLE, getStats(subchannel).state);
   2361     subchannel.requestConnection();
   2362     assertEquals(CONNECTING, getStats(subchannel).state);
   2363 
   2364     MockClientTransportInfo transportInfo = transports.poll();
   2365 
   2366     assertEquals(CONNECTING, getStats(subchannel).state);
   2367     transportInfo.listener.transportReady();
   2368     assertEquals(READY, getStats(subchannel).state);
   2369 
   2370     assertEquals(CONNECTING, getStats(channel).state);
   2371     helper.updateBalancingState(READY, mockPicker);
   2372     assertEquals(READY, getStats(channel).state);
   2373 
   2374     channel.shutdownNow();
   2375     assertEquals(SHUTDOWN, getStats(channel).state);
   2376     assertEquals(SHUTDOWN, getStats(subchannel).state);
   2377   }
   2378 
   2379   @Test
   2380   public void channelStat_callStarted() throws Exception {
   2381     createChannel();
   2382     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
   2383     assertEquals(0, getStats(channel).callsStarted);
   2384     call.start(mockCallListener, new Metadata());
   2385     assertEquals(1, getStats(channel).callsStarted);
   2386     assertEquals(executor.getTicker().read(), getStats(channel).lastCallStartedNanos);
   2387   }
   2388 
   2389   @Test
   2390   public void channelsAndSubChannels_instrumented_success() throws Exception {
   2391     channelsAndSubchannels_instrumented0(true);
   2392   }
   2393 
   2394   @Test
   2395   public void channelsAndSubChannels_instrumented_fail() throws Exception {
   2396     channelsAndSubchannels_instrumented0(false);
   2397   }
   2398 
   2399   private void channelsAndSubchannels_instrumented0(boolean success) throws Exception {
   2400     createChannel();
   2401 
   2402     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
   2403 
   2404     // Channel stat bumped when ClientCall.start() called
   2405     assertEquals(0, getStats(channel).callsStarted);
   2406     call.start(mockCallListener, new Metadata());
   2407     assertEquals(1, getStats(channel).callsStarted);
   2408 
   2409     ClientStream mockStream = mock(ClientStream.class);
   2410     ClientStreamTracer.Factory factory = mock(ClientStreamTracer.Factory.class);
   2411     AbstractSubchannel subchannel =
   2412         (AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY);
   2413     subchannel.requestConnection();
   2414     MockClientTransportInfo transportInfo = transports.poll();
   2415     transportInfo.listener.transportReady();
   2416     ClientTransport mockTransport = transportInfo.transport;
   2417     when(mockTransport.newStream(
   2418             any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
   2419         .thenReturn(mockStream);
   2420     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
   2421         PickResult.withSubchannel(subchannel, factory));
   2422 
   2423     // subchannel stat bumped when call gets assigned to it
   2424     assertEquals(0, getStats(subchannel).callsStarted);
   2425     helper.updateBalancingState(READY, mockPicker);
   2426     assertEquals(1, executor.runDueTasks());
   2427     verify(mockStream).start(streamListenerCaptor.capture());
   2428     assertEquals(1, getStats(subchannel).callsStarted);
   2429 
   2430     ClientStreamListener streamListener = streamListenerCaptor.getValue();
   2431     call.halfClose();
   2432 
   2433     // closing stream listener affects subchannel stats immediately
   2434     assertEquals(0, getStats(subchannel).callsSucceeded);
   2435     assertEquals(0, getStats(subchannel).callsFailed);
   2436     streamListener.closed(success ? Status.OK : Status.UNKNOWN, new Metadata());
   2437     if (success) {
   2438       assertEquals(1, getStats(subchannel).callsSucceeded);
   2439       assertEquals(0, getStats(subchannel).callsFailed);
   2440     } else {
   2441       assertEquals(0, getStats(subchannel).callsSucceeded);
   2442       assertEquals(1, getStats(subchannel).callsFailed);
   2443     }
   2444 
   2445     // channel stats bumped when the ClientCall.Listener is notified
   2446     assertEquals(0, getStats(channel).callsSucceeded);
   2447     assertEquals(0, getStats(channel).callsFailed);
   2448     executor.runDueTasks();
   2449     if (success) {
   2450       assertEquals(1, getStats(channel).callsSucceeded);
   2451       assertEquals(0, getStats(channel).callsFailed);
   2452     } else {
   2453       assertEquals(0, getStats(channel).callsSucceeded);
   2454       assertEquals(1, getStats(channel).callsFailed);
   2455     }
   2456   }
   2457 
   2458   @Test
   2459   public void channelsAndSubchannels_oob_instrumented_success() throws Exception {
   2460     channelsAndSubchannels_oob_instrumented0(true);
   2461   }
   2462 
   2463   @Test
   2464   public void channelsAndSubchannels_oob_instrumented_fail() throws Exception {
   2465     channelsAndSubchannels_oob_instrumented0(false);
   2466   }
   2467 
   2468   private void channelsAndSubchannels_oob_instrumented0(boolean success) throws Exception {
   2469     // set up
   2470     ClientStream mockStream = mock(ClientStream.class);
   2471     createChannel();
   2472 
   2473     OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobauthority");
   2474     AbstractSubchannel oobSubchannel = (AbstractSubchannel) oobChannel.getSubchannel();
   2475     FakeClock callExecutor = new FakeClock();
   2476     CallOptions options =
   2477         CallOptions.DEFAULT.withExecutor(callExecutor.getScheduledExecutorService());
   2478     ClientCall<String, Integer> call = oobChannel.newCall(method, options);
   2479     Metadata headers = new Metadata();
   2480 
   2481     // Channel stat bumped when ClientCall.start() called
   2482     assertEquals(0, getStats(oobChannel).callsStarted);
   2483     call.start(mockCallListener, headers);
   2484     assertEquals(1, getStats(oobChannel).callsStarted);
   2485 
   2486     MockClientTransportInfo transportInfo = transports.poll();
   2487     ConnectionClientTransport mockTransport = transportInfo.transport;
   2488     ManagedClientTransport.Listener transportListener = transportInfo.listener;
   2489     when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class)))
   2490         .thenReturn(mockStream);
   2491 
   2492     // subchannel stat bumped when call gets assigned to it
   2493     assertEquals(0, getStats(oobSubchannel).callsStarted);
   2494     transportListener.transportReady();
   2495     callExecutor.runDueTasks();
   2496     verify(mockStream).start(streamListenerCaptor.capture());
   2497     assertEquals(1, getStats(oobSubchannel).callsStarted);
   2498 
   2499     ClientStreamListener streamListener = streamListenerCaptor.getValue();
   2500     call.halfClose();
   2501 
   2502     // closing stream listener affects subchannel stats immediately
   2503     assertEquals(0, getStats(oobSubchannel).callsSucceeded);
   2504     assertEquals(0, getStats(oobSubchannel).callsFailed);
   2505     streamListener.closed(success ? Status.OK : Status.UNKNOWN, new Metadata());
   2506     if (success) {
   2507       assertEquals(1, getStats(oobSubchannel).callsSucceeded);
   2508       assertEquals(0, getStats(oobSubchannel).callsFailed);
   2509     } else {
   2510       assertEquals(0, getStats(oobSubchannel).callsSucceeded);
   2511       assertEquals(1, getStats(oobSubchannel).callsFailed);
   2512     }
   2513 
   2514     // channel stats bumped when the ClientCall.Listener is notified
   2515     assertEquals(0, getStats(oobChannel).callsSucceeded);
   2516     assertEquals(0, getStats(oobChannel).callsFailed);
   2517     callExecutor.runDueTasks();
   2518     if (success) {
   2519       assertEquals(1, getStats(oobChannel).callsSucceeded);
   2520       assertEquals(0, getStats(oobChannel).callsFailed);
   2521     } else {
   2522       assertEquals(0, getStats(oobChannel).callsSucceeded);
   2523       assertEquals(1, getStats(oobChannel).callsFailed);
   2524     }
   2525     // oob channel is separate from the original channel
   2526     assertEquals(0, getStats(channel).callsSucceeded);
   2527     assertEquals(0, getStats(channel).callsFailed);
   2528   }
   2529 
   2530   @Test
   2531   public void channelsAndSubchannels_oob_instrumented_name() throws Exception {
   2532     createChannel();
   2533 
   2534     String authority = "oobauthority";
   2535     OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, authority);
   2536     assertEquals(authority, getStats(oobChannel).target);
   2537   }
   2538 
   2539   @Test
   2540   public void channelsAndSubchannels_oob_instrumented_state() throws Exception {
   2541     createChannel();
   2542 
   2543     OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobauthority");
   2544     assertEquals(IDLE, getStats(oobChannel).state);
   2545 
   2546     oobChannel.getSubchannel().requestConnection();
   2547     assertEquals(CONNECTING, getStats(oobChannel).state);
   2548 
   2549     MockClientTransportInfo transportInfo = transports.poll();
   2550     ManagedClientTransport.Listener transportListener = transportInfo.listener;
   2551 
   2552     transportListener.transportReady();
   2553     assertEquals(READY, getStats(oobChannel).state);
   2554 
   2555     // oobchannel state is separate from the ManagedChannel
   2556     assertEquals(IDLE, getStats(channel).state);
   2557     channel.shutdownNow();
   2558     assertEquals(SHUTDOWN, getStats(channel).state);
   2559     assertEquals(SHUTDOWN, getStats(oobChannel).state);
   2560   }
   2561 
   2562   @Test
   2563   public void binaryLogInstalled() throws Exception {
   2564     final SettableFuture<Boolean> intercepted = SettableFuture.create();
   2565     channelBuilder.binlog = new BinaryLog() {
   2566       @Override
   2567       public void close() throws IOException {
   2568         // noop
   2569       }
   2570 
   2571       @Override
   2572       public <ReqT, RespT> ServerMethodDefinition<?, ?> wrapMethodDefinition(
   2573           ServerMethodDefinition<ReqT, RespT> oMethodDef) {
   2574         return oMethodDef;
   2575       }
   2576 
   2577       @Override
   2578       public Channel wrapChannel(Channel channel) {
   2579         return ClientInterceptors.intercept(channel,
   2580             new ClientInterceptor() {
   2581               @Override
   2582               public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
   2583                   MethodDescriptor<ReqT, RespT> method,
   2584                   CallOptions callOptions,
   2585                   Channel next) {
   2586                 intercepted.set(true);
   2587                 return next.newCall(method, callOptions);
   2588               }
   2589             });
   2590       }
   2591     };
   2592 
   2593     createChannel();
   2594     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
   2595     call.start(mockCallListener, new Metadata());
   2596     assertTrue(intercepted.get());
   2597   }
   2598 
   2599   @Test
   2600   public void retryBackoffThenChannelShutdown_retryShouldStillHappen_newCallShouldFail() {
   2601     Map<String, Object> retryPolicy = new HashMap<String, Object>();
   2602     retryPolicy.put("maxAttempts", 3D);
   2603     retryPolicy.put("initialBackoff", "10s");
   2604     retryPolicy.put("maxBackoff", "30s");
   2605     retryPolicy.put("backoffMultiplier", 2D);
   2606     retryPolicy.put("retryableStatusCodes", Arrays.<Object>asList("UNAVAILABLE"));
   2607     Map<String, Object> methodConfig = new HashMap<String, Object>();
   2608     Map<String, Object> name = new HashMap<String, Object>();
   2609     name.put("service", "service");
   2610     methodConfig.put("name", Arrays.<Object>asList(name));
   2611     methodConfig.put("retryPolicy", retryPolicy);
   2612     Map<String, Object> serviceConfig = new HashMap<String, Object>();
   2613     serviceConfig.put("methodConfig", Arrays.<Object>asList(methodConfig));
   2614     Attributes attributesWithRetryPolicy = Attributes
   2615         .newBuilder().set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build();
   2616 
   2617     FakeNameResolverFactory nameResolverFactory =
   2618         new FakeNameResolverFactory.Builder(expectedUri)
   2619             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
   2620             .build();
   2621     nameResolverFactory.nextResolvedAttributes.set(attributesWithRetryPolicy);
   2622     channelBuilder.nameResolverFactory(nameResolverFactory);
   2623     channelBuilder.executor(MoreExecutors.directExecutor());
   2624     channelBuilder.enableRetry();
   2625     RetriableStream.setRandom(
   2626         // not random
   2627         new Random() {
   2628           @Override
   2629           public double nextDouble() {
   2630             return 1D; // fake random
   2631           }
   2632         });
   2633 
   2634     requestConnection = false;
   2635     createChannel();
   2636 
   2637     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
   2638     call.start(mockCallListener, new Metadata());
   2639     ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
   2640     verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture());
   2641     helper = helperCaptor.getValue();
   2642     verify(mockLoadBalancer)
   2643         .handleResolvedAddressGroups(nameResolverFactory.servers, attributesWithRetryPolicy);
   2644 
   2645     // simulating request connection and then transport ready after resolved address
   2646     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
   2647     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
   2648         .thenReturn(PickResult.withSubchannel(subchannel));
   2649     subchannel.requestConnection();
   2650     MockClientTransportInfo transportInfo = transports.poll();
   2651     ConnectionClientTransport mockTransport = transportInfo.transport;
   2652     ClientStream mockStream = mock(ClientStream.class);
   2653     ClientStream mockStream2 = mock(ClientStream.class);
   2654     when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class)))
   2655         .thenReturn(mockStream).thenReturn(mockStream2);
   2656     transportInfo.listener.transportReady();
   2657     helper.updateBalancingState(READY, mockPicker);
   2658 
   2659     ArgumentCaptor<ClientStreamListener> streamListenerCaptor =
   2660         ArgumentCaptor.forClass(ClientStreamListener.class);
   2661     verify(mockStream).start(streamListenerCaptor.capture());
   2662     assertThat(timer.getPendingTasks()).isEmpty();
   2663 
   2664     // trigger retry
   2665     streamListenerCaptor.getValue().closed(Status.UNAVAILABLE, new Metadata());
   2666 
   2667     // in backoff
   2668     timer.forwardTime(5, TimeUnit.SECONDS);
   2669     assertThat(timer.getPendingTasks()).hasSize(1);
   2670     verify(mockStream2, never()).start(any(ClientStreamListener.class));
   2671 
   2672     // shutdown during backoff period
   2673     channel.shutdown();
   2674 
   2675     assertThat(timer.getPendingTasks()).hasSize(1);
   2676     verify(mockCallListener, never()).onClose(any(Status.class), any(Metadata.class));
   2677 
   2678     ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT);
   2679     call2.start(mockCallListener2, new Metadata());
   2680 
   2681     ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
   2682     verify(mockCallListener2).onClose(statusCaptor.capture(), any(Metadata.class));
   2683     assertSame(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
   2684     assertEquals("Channel shutdown invoked", statusCaptor.getValue().getDescription());
   2685 
   2686     // backoff ends
   2687     timer.forwardTime(5, TimeUnit.SECONDS);
   2688     assertThat(timer.getPendingTasks()).isEmpty();
   2689     verify(mockStream2).start(streamListenerCaptor.capture());
   2690     verify(mockLoadBalancer, never()).shutdown();
   2691     assertFalse(
   2692         "channel.isTerminated() is expected to be false but was true",
   2693         channel.isTerminated());
   2694 
   2695     streamListenerCaptor.getValue().closed(Status.INTERNAL, new Metadata());
   2696     verify(mockLoadBalancer).shutdown();
   2697     // simulating the shutdown of load balancer triggers the shutdown of subchannel
   2698     subchannel.shutdown();
   2699     transportInfo.listener.transportTerminated(); // simulating transport terminated
   2700     assertTrue(
   2701         "channel.isTerminated() is expected to be true but was false",
   2702         channel.isTerminated());
   2703   }
   2704 
   2705   @Test
   2706   public void badServiceConfigIsRecoverable() throws Exception {
   2707     final List<EquivalentAddressGroup> addresses =
   2708         ImmutableList.of(new EquivalentAddressGroup(new SocketAddress() {}));
   2709     final class FakeNameResolver extends NameResolver {
   2710       Listener listener;
   2711 
   2712       @Override
   2713       public String getServiceAuthority() {
   2714         return "also fake";
   2715       }
   2716 
   2717       @Override
   2718       public void start(Listener listener) {
   2719         this.listener = listener;
   2720         listener.onAddresses(addresses,
   2721             Attributes.newBuilder()
   2722                 .set(
   2723                     GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG,
   2724                     ImmutableMap.<String, Object>of("loadBalancingPolicy", "kaboom"))
   2725                 .build());
   2726       }
   2727 
   2728       @Override
   2729       public void shutdown() {}
   2730     }
   2731 
   2732     final class FakeNameResolverFactory extends NameResolver.Factory {
   2733       FakeNameResolver resolver;
   2734 
   2735       @Nullable
   2736       @Override
   2737       public NameResolver newNameResolver(URI targetUri, Attributes params) {
   2738         return (resolver = new FakeNameResolver());
   2739       }
   2740 
   2741       @Override
   2742       public String getDefaultScheme() {
   2743         return "fake";
   2744       }
   2745     }
   2746 
   2747     FakeNameResolverFactory factory = new FakeNameResolverFactory();
   2748     final class CustomBuilder extends AbstractManagedChannelImplBuilder<CustomBuilder> {
   2749 
   2750       CustomBuilder() {
   2751         super(TARGET);
   2752         this.executorPool = ManagedChannelImplTest.this.executorPool;
   2753         this.channelz = ManagedChannelImplTest.this.channelz;
   2754       }
   2755 
   2756       @Override
   2757       protected ClientTransportFactory buildTransportFactory() {
   2758         return mockTransportFactory;
   2759       }
   2760     }
   2761 
   2762     ManagedChannel mychannel = new CustomBuilder()
   2763         .nameResolverFactory(factory)
   2764         .loadBalancerFactory(new AutoConfiguredLoadBalancerFactory()).build();
   2765 
   2766     ClientCall<Void, Void> call1 =
   2767         mychannel.newCall(TestMethodDescriptors.voidMethod(), CallOptions.DEFAULT);
   2768     ListenableFuture<Void> future1 = ClientCalls.futureUnaryCall(call1, null);
   2769     executor.runDueTasks();
   2770     try {
   2771       future1.get();
   2772       Assert.fail();
   2773     } catch (ExecutionException e) {
   2774       assertThat(Throwables.getStackTraceAsString(e.getCause())).contains("kaboom");
   2775     }
   2776 
   2777     // ok the service config is bad, let's fix it.
   2778 
   2779     factory.resolver.listener.onAddresses(addresses,
   2780         Attributes.newBuilder()
   2781         .set(
   2782             GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG,
   2783             ImmutableMap.<String, Object>of("loadBalancingPolicy", "round_robin"))
   2784         .build());
   2785 
   2786     ClientCall<Void, Void> call2 = mychannel.newCall(
   2787         TestMethodDescriptors.voidMethod(),
   2788         CallOptions.DEFAULT.withDeadlineAfter(5, TimeUnit.SECONDS));
   2789     ListenableFuture<Void> future2 = ClientCalls.futureUnaryCall(call2, null);
   2790 
   2791     timer.forwardTime(1234, TimeUnit.SECONDS);
   2792 
   2793     executor.runDueTasks();
   2794     try {
   2795       future2.get();
   2796       Assert.fail();
   2797     } catch (ExecutionException e) {
   2798       assertThat(Throwables.getStackTraceAsString(e.getCause())).contains("deadline");
   2799     }
   2800 
   2801     mychannel.shutdownNow();
   2802   }
   2803 
   2804   private static final class ChannelBuilder
   2805       extends AbstractManagedChannelImplBuilder<ChannelBuilder> {
   2806 
   2807     ChannelBuilder() {
   2808       super(TARGET);
   2809     }
   2810 
   2811     @Override protected ClientTransportFactory buildTransportFactory() {
   2812       throw new UnsupportedOperationException();
   2813     }
   2814 
   2815     @Override protected Attributes getNameResolverParams() {
   2816       return NAME_RESOLVER_PARAMS;
   2817     }
   2818   }
   2819 
   2820   private static final class FakeBackoffPolicyProvider implements BackoffPolicy.Provider {
   2821     @Override
   2822     public BackoffPolicy get() {
   2823       return new BackoffPolicy() {
   2824         int multiplier = 1;
   2825 
   2826         @Override
   2827         public long nextBackoffNanos() {
   2828           return RECONNECT_BACKOFF_INTERVAL_NANOS * multiplier++;
   2829         }
   2830       };
   2831     }
   2832   }
   2833 
   2834   private static final class FakeNameResolverFactory extends NameResolver.Factory {
   2835     final URI expectedUri;
   2836     final List<EquivalentAddressGroup> servers;
   2837     final boolean resolvedAtStart;
   2838     final Status error;
   2839     final ArrayList<FakeNameResolver> resolvers = new ArrayList<>();
   2840     // The Attributes argument of the next invocation of listener.onAddresses(servers, attrs)
   2841     final AtomicReference<Attributes> nextResolvedAttributes =
   2842         new AtomicReference<Attributes>(Attributes.EMPTY);
   2843 
   2844     FakeNameResolverFactory(
   2845         URI expectedUri,
   2846         List<EquivalentAddressGroup> servers,
   2847         boolean resolvedAtStart,
   2848         Status error) {
   2849       this.expectedUri = expectedUri;
   2850       this.servers = servers;
   2851       this.resolvedAtStart = resolvedAtStart;
   2852       this.error = error;
   2853     }
   2854 
   2855     @Override
   2856     public NameResolver newNameResolver(final URI targetUri, Attributes params) {
   2857       if (!expectedUri.equals(targetUri)) {
   2858         return null;
   2859       }
   2860       assertSame(NAME_RESOLVER_PARAMS, params);
   2861       FakeNameResolver resolver = new FakeNameResolver(error);
   2862       resolvers.add(resolver);
   2863       return resolver;
   2864     }
   2865 
   2866     @Override
   2867     public String getDefaultScheme() {
   2868       return "fake";
   2869     }
   2870 
   2871     void allResolved() {
   2872       for (FakeNameResolver resolver : resolvers) {
   2873         resolver.resolved();
   2874       }
   2875     }
   2876 
   2877     final class FakeNameResolver extends NameResolver {
   2878       Listener listener;
   2879       boolean shutdown;
   2880       int refreshCalled;
   2881       Status error;
   2882 
   2883       FakeNameResolver(Status error) {
   2884         this.error = error;
   2885       }
   2886 
   2887       @Override public String getServiceAuthority() {
   2888         return expectedUri.getAuthority();
   2889       }
   2890 
   2891       @Override public void start(final Listener listener) {
   2892         this.listener = listener;
   2893         if (resolvedAtStart) {
   2894           resolved();
   2895         }
   2896       }
   2897 
   2898       @Override public void refresh() {
   2899         assertNotNull(listener);
   2900         refreshCalled++;
   2901         resolved();
   2902       }
   2903 
   2904       void resolved() {
   2905         if (error != null) {
   2906           listener.onError(error);
   2907           return;
   2908         }
   2909         listener.onAddresses(servers, nextResolvedAttributes.get());
   2910       }
   2911 
   2912       @Override public void shutdown() {
   2913         shutdown = true;
   2914       }
   2915     }
   2916 
   2917     static final class Builder {
   2918       final URI expectedUri;
   2919       List<EquivalentAddressGroup> servers = ImmutableList.<EquivalentAddressGroup>of();
   2920       boolean resolvedAtStart = true;
   2921       Status error = null;
   2922 
   2923       Builder(URI expectedUri) {
   2924         this.expectedUri = expectedUri;
   2925       }
   2926 
   2927       Builder setServers(List<EquivalentAddressGroup> servers) {
   2928         this.servers = servers;
   2929         return this;
   2930       }
   2931 
   2932       Builder setResolvedAtStart(boolean resolvedAtStart) {
   2933         this.resolvedAtStart = resolvedAtStart;
   2934         return this;
   2935       }
   2936 
   2937       Builder setError(Status error) {
   2938         this.error = error;
   2939         return this;
   2940       }
   2941 
   2942       FakeNameResolverFactory build() {
   2943         return new FakeNameResolverFactory(expectedUri, servers, resolvedAtStart, error);
   2944       }
   2945     }
   2946   }
   2947 
   2948   private static ChannelStats getStats(AbstractSubchannel subchannel) throws Exception {
   2949     return subchannel.getInternalSubchannel().getStats().get();
   2950   }
   2951 
   2952   private static ChannelStats getStats(
   2953       InternalInstrumented<ChannelStats> instrumented) throws Exception {
   2954     return instrumented.getStats().get();
   2955   }
   2956 
   2957   private FakeClock.ScheduledTask getNameResolverRefresh() {
   2958     return Iterables.getOnlyElement(timer.getPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER), null);
   2959   }
   2960 }
   2961