Home | History | Annotate | Download | only in grpclb
      1 /*
      2  * Copyright 2017 The gRPC Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package io.grpc.grpclb;
     18 
     19 import static com.google.common.base.Preconditions.checkNotNull;
     20 
     21 import com.google.protobuf.util.Timestamps;
     22 import io.grpc.CallOptions;
     23 import io.grpc.ClientStreamTracer;
     24 import io.grpc.Metadata;
     25 import io.grpc.Status;
     26 import io.grpc.internal.TimeProvider;
     27 import io.grpc.lb.v1.ClientStats;
     28 import io.grpc.lb.v1.ClientStatsPerToken;
     29 import java.util.Collections;
     30 import java.util.HashMap;
     31 import java.util.Map;
     32 import java.util.Map.Entry;
     33 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
     34 import javax.annotation.concurrent.GuardedBy;
     35 import javax.annotation.concurrent.ThreadSafe;
     36 
     37 /**
     38  * Record and aggregate client-side load data for GRPCLB.  This records load occurred during the
     39  * span of an LB stream with the remote load-balancer.
     40  */
     41 @ThreadSafe
     42 final class GrpclbClientLoadRecorder extends ClientStreamTracer.Factory {
     43 
     44   private static final AtomicLongFieldUpdater<GrpclbClientLoadRecorder> callsStartedUpdater =
     45       AtomicLongFieldUpdater.newUpdater(GrpclbClientLoadRecorder.class, "callsStarted");
     46   private static final AtomicLongFieldUpdater<GrpclbClientLoadRecorder> callsFinishedUpdater =
     47       AtomicLongFieldUpdater.newUpdater(GrpclbClientLoadRecorder.class, "callsFinished");
     48   private static final AtomicLongFieldUpdater<GrpclbClientLoadRecorder> callsFailedToSendUpdater =
     49       AtomicLongFieldUpdater.newUpdater(GrpclbClientLoadRecorder.class, "callsFailedToSend");
     50   private static final AtomicLongFieldUpdater<GrpclbClientLoadRecorder>
     51       callsFinishedKnownReceivedUpdater =
     52           AtomicLongFieldUpdater.newUpdater(
     53               GrpclbClientLoadRecorder.class, "callsFinishedKnownReceived");
     54 
     55   private final TimeProvider time;
     56   @SuppressWarnings("unused")
     57   private volatile long callsStarted;
     58   @SuppressWarnings("unused")
     59   private volatile long callsFinished;
     60 
     61   private static final class LongHolder {
     62     long num;
     63   }
     64 
     65   // Specific finish types
     66   @GuardedBy("this")
     67   private Map<String, LongHolder> callsDroppedPerToken = new HashMap<String, LongHolder>(1);
     68   @SuppressWarnings("unused")
     69   private volatile long callsFailedToSend;
     70   @SuppressWarnings("unused")
     71   private volatile long callsFinishedKnownReceived;
     72 
     73   GrpclbClientLoadRecorder(TimeProvider time) {
     74     this.time = checkNotNull(time, "time provider");
     75   }
     76 
     77   @Override
     78   public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) {
     79     callsStartedUpdater.getAndIncrement(this);
     80     return new StreamTracer();
     81   }
     82 
     83   /**
     84    * Records that a request has been dropped as instructed by the remote balancer.
     85    */
     86   void recordDroppedRequest(String token) {
     87     callsStartedUpdater.getAndIncrement(this);
     88     callsFinishedUpdater.getAndIncrement(this);
     89 
     90     synchronized (this) {
     91       LongHolder holder;
     92       if ((holder = callsDroppedPerToken.get(token)) == null) {
     93         callsDroppedPerToken.put(token, (holder = new LongHolder()));
     94       }
     95       holder.num++;
     96     }
     97   }
     98 
     99   /**
    100    * Generate the report with the data recorded this LB stream since the last report.
    101    */
    102   ClientStats generateLoadReport() {
    103     ClientStats.Builder statsBuilder =
    104         ClientStats.newBuilder()
    105         .setTimestamp(Timestamps.fromNanos(time.currentTimeNanos()))
    106         .setNumCallsStarted(callsStartedUpdater.getAndSet(this, 0))
    107         .setNumCallsFinished(callsFinishedUpdater.getAndSet(this, 0))
    108         .setNumCallsFinishedWithClientFailedToSend(callsFailedToSendUpdater.getAndSet(this, 0))
    109         .setNumCallsFinishedKnownReceived(callsFinishedKnownReceivedUpdater.getAndSet(this, 0));
    110 
    111     Map<String, LongHolder> localCallsDroppedPerToken = Collections.emptyMap();
    112     synchronized (this) {
    113       if (!callsDroppedPerToken.isEmpty()) {
    114         localCallsDroppedPerToken = callsDroppedPerToken;
    115         callsDroppedPerToken = new HashMap<String, LongHolder>(localCallsDroppedPerToken.size());
    116       }
    117     }
    118     for (Entry<String, LongHolder> entry : localCallsDroppedPerToken.entrySet()) {
    119       statsBuilder.addCallsFinishedWithDrop(
    120           ClientStatsPerToken.newBuilder()
    121               .setLoadBalanceToken(entry.getKey())
    122               .setNumCalls(entry.getValue().num)
    123               .build());
    124     }
    125     return statsBuilder.build();
    126   }
    127 
    128   private class StreamTracer extends ClientStreamTracer {
    129     private volatile boolean headersSent;
    130     private volatile boolean anythingReceived;
    131 
    132     @Override
    133     public void outboundHeaders() {
    134       headersSent = true;
    135     }
    136 
    137     @Override
    138     public void inboundHeaders() {
    139       anythingReceived = true;
    140     }
    141 
    142     @Override
    143     public void inboundMessage(int seqNo) {
    144       anythingReceived = true;
    145     }
    146 
    147     @Override
    148     public void streamClosed(Status status) {
    149       callsFinishedUpdater.getAndIncrement(GrpclbClientLoadRecorder.this);
    150       if (!headersSent) {
    151         callsFailedToSendUpdater.getAndIncrement(GrpclbClientLoadRecorder.this);
    152       }
    153       if (anythingReceived) {
    154         callsFinishedKnownReceivedUpdater.getAndIncrement(GrpclbClientLoadRecorder.this);
    155       }
    156     }
    157   }
    158 }
    159