Home | History | Annotate | Download | only in internal
      1 /*
      2  * Copyright 2016 The gRPC Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package io.grpc.internal;
     18 
     19 import static com.google.common.base.Preconditions.checkNotNull;
     20 
     21 import io.grpc.ConnectivityState;
     22 import io.grpc.ManagedChannel;
     23 import java.util.ArrayList;
     24 import java.util.concurrent.Executor;
     25 import javax.annotation.Nonnull;
     26 import javax.annotation.concurrent.NotThreadSafe;
     27 
     28 /**
     29  * Manages connectivity states of the channel. Used for {@link ManagedChannel#getState} to read the
     30  * current state of the channel, for {@link ManagedChannel#notifyWhenStateChanged} to add
     31  * listeners to state change events, and for {@link io.grpc.LoadBalancer.Helper#updateBalancingState
     32  * LoadBalancer.Helper#updateBalancingState} to update the state and run the {@link #gotoState}s.
     33  */
     34 @NotThreadSafe
     35 final class ConnectivityStateManager {
     36   private ArrayList<Listener> listeners = new ArrayList<>();
     37 
     38   private volatile ConnectivityState state = ConnectivityState.IDLE;
     39 
     40   /**
     41    * Adds a listener for state change event.
     42    *
     43    * <p>The {@code executor} must be one that can run RPC call listeners.
     44    */
     45   void notifyWhenStateChanged(Runnable callback, Executor executor, ConnectivityState source) {
     46     checkNotNull(callback, "callback");
     47     checkNotNull(executor, "executor");
     48     checkNotNull(source, "source");
     49 
     50     Listener stateChangeListener = new Listener(callback, executor);
     51     if (state != source) {
     52       stateChangeListener.runInExecutor();
     53     } else {
     54       listeners.add(stateChangeListener);
     55     }
     56   }
     57 
     58   /**
     59    * Connectivity state is changed to the specified value. Will trigger some notifications that have
     60    * been registered earlier by {@link ManagedChannel#notifyWhenStateChanged}.
     61    */
     62   void gotoState(@Nonnull ConnectivityState newState) {
     63     checkNotNull(newState, "newState");
     64     if (state != newState && state != ConnectivityState.SHUTDOWN) {
     65       state = newState;
     66       if (listeners.isEmpty()) {
     67         return;
     68       }
     69       // Swap out callback list before calling them, because a callback may register new callbacks,
     70       // if run in direct executor, can cause ConcurrentModificationException.
     71       ArrayList<Listener> savedListeners = listeners;
     72       listeners = new ArrayList<>();
     73       for (Listener listener : savedListeners) {
     74         listener.runInExecutor();
     75       }
     76     }
     77   }
     78 
     79   /**
     80    * Gets the current connectivity state of the channel. This method is threadsafe.
     81    */
     82   ConnectivityState getState() {
     83     ConnectivityState stateCopy = state;
     84     if (stateCopy == null) {
     85       throw new UnsupportedOperationException("Channel state API is not implemented");
     86     }
     87     return stateCopy;
     88   }
     89 
     90   private static final class Listener {
     91     final Runnable callback;
     92     final Executor executor;
     93 
     94     Listener(Runnable callback, Executor executor) {
     95       this.callback = callback;
     96       this.executor = executor;
     97     }
     98 
     99     void runInExecutor() {
    100       executor.execute(callback);
    101     }
    102   }
    103 }
    104