Home | History | Annotate | Download | only in export
      1 /*
      2  * Copyright 2018, OpenCensus Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package io.opencensus.implcore.trace.export;
     18 
     19 import com.google.common.collect.EvictingQueue;
     20 import io.opencensus.implcore.internal.EventQueue;
     21 import io.opencensus.implcore.trace.RecordEventsSpanImpl;
     22 import io.opencensus.trace.Status;
     23 import io.opencensus.trace.Status.CanonicalCode;
     24 import io.opencensus.trace.export.SampledSpanStore;
     25 import io.opencensus.trace.export.SpanData;
     26 import java.util.ArrayList;
     27 import java.util.Collection;
     28 import java.util.Collections;
     29 import java.util.EnumMap;
     30 import java.util.HashMap;
     31 import java.util.HashSet;
     32 import java.util.List;
     33 import java.util.Map;
     34 import java.util.Set;
     35 import java.util.concurrent.TimeUnit;
     36 import javax.annotation.Nullable;
     37 import javax.annotation.concurrent.GuardedBy;
     38 import javax.annotation.concurrent.ThreadSafe;
     39 
     40 /** In-process implementation of the {@link SampledSpanStore}. */
     41 @ThreadSafe
     42 public final class InProcessSampledSpanStoreImpl extends SampledSpanStoreImpl {
     43   private static final int NUM_SAMPLES_PER_LATENCY_BUCKET = 10;
     44   private static final int NUM_SAMPLES_PER_ERROR_BUCKET = 5;
     45   private static final long TIME_BETWEEN_SAMPLES = TimeUnit.SECONDS.toNanos(1);
     46   private static final int NUM_LATENCY_BUCKETS = LatencyBucketBoundaries.values().length;
     47   // The total number of canonical codes - 1 (the OK code).
     48   private static final int NUM_ERROR_BUCKETS = CanonicalCode.values().length - 1;
     49   private static final int MAX_PER_SPAN_NAME_SAMPLES =
     50       NUM_SAMPLES_PER_LATENCY_BUCKET * NUM_LATENCY_BUCKETS
     51           + NUM_SAMPLES_PER_ERROR_BUCKET * NUM_ERROR_BUCKETS;
     52 
     53   // Used to stream the register/unregister events to the implementation to avoid lock contention
     54   // between the main threads and the worker thread.
     55   private final EventQueue eventQueue;
     56 
     57   @GuardedBy("samples")
     58   private final Map<String, PerSpanNameSamples> samples;
     59 
     60   private static final class Bucket {
     61 
     62     private final EvictingQueue<RecordEventsSpanImpl> sampledSpansQueue;
     63     private final EvictingQueue<RecordEventsSpanImpl> notSampledSpansQueue;
     64     private long lastSampledNanoTime;
     65     private long lastNotSampledNanoTime;
     66 
     67     private Bucket(int numSamples) {
     68       sampledSpansQueue = EvictingQueue.create(numSamples);
     69       notSampledSpansQueue = EvictingQueue.create(numSamples);
     70     }
     71 
     72     private void considerForSampling(RecordEventsSpanImpl span) {
     73       long spanEndNanoTime = span.getEndNanoTime();
     74       if (span.getContext().getTraceOptions().isSampled()) {
     75         // Need to compare by doing the subtraction all the time because in case of an overflow,
     76         // this may never sample again (at least for the next ~200 years). No real chance to
     77         // overflow two times because that means the process runs for ~200 years.
     78         if (spanEndNanoTime - lastSampledNanoTime > TIME_BETWEEN_SAMPLES) {
     79           sampledSpansQueue.add(span);
     80           lastSampledNanoTime = spanEndNanoTime;
     81         }
     82       } else {
     83         // Need to compare by doing the subtraction all the time because in case of an overflow,
     84         // this may never sample again (at least for the next ~200 years). No real chance to
     85         // overflow two times because that means the process runs for ~200 years.
     86         if (spanEndNanoTime - lastNotSampledNanoTime > TIME_BETWEEN_SAMPLES) {
     87           notSampledSpansQueue.add(span);
     88           lastNotSampledNanoTime = spanEndNanoTime;
     89         }
     90       }
     91     }
     92 
     93     private void getSamples(int maxSpansToReturn, List<RecordEventsSpanImpl> output) {
     94       getSamples(maxSpansToReturn, output, sampledSpansQueue);
     95       getSamples(maxSpansToReturn, output, notSampledSpansQueue);
     96     }
     97 
     98     private static void getSamples(
     99         int maxSpansToReturn,
    100         List<RecordEventsSpanImpl> output,
    101         EvictingQueue<RecordEventsSpanImpl> queue) {
    102       for (RecordEventsSpanImpl span : queue) {
    103         if (output.size() >= maxSpansToReturn) {
    104           break;
    105         }
    106         output.add(span);
    107       }
    108     }
    109 
    110     private void getSamplesFilteredByLatency(
    111         long latencyLowerNs,
    112         long latencyUpperNs,
    113         int maxSpansToReturn,
    114         List<RecordEventsSpanImpl> output) {
    115       getSamplesFilteredByLatency(
    116           latencyLowerNs, latencyUpperNs, maxSpansToReturn, output, sampledSpansQueue);
    117       getSamplesFilteredByLatency(
    118           latencyLowerNs, latencyUpperNs, maxSpansToReturn, output, notSampledSpansQueue);
    119     }
    120 
    121     private static void getSamplesFilteredByLatency(
    122         long latencyLowerNs,
    123         long latencyUpperNs,
    124         int maxSpansToReturn,
    125         List<RecordEventsSpanImpl> output,
    126         EvictingQueue<RecordEventsSpanImpl> queue) {
    127       for (RecordEventsSpanImpl span : queue) {
    128         if (output.size() >= maxSpansToReturn) {
    129           break;
    130         }
    131         long spanLatencyNs = span.getLatencyNs();
    132         if (spanLatencyNs >= latencyLowerNs && spanLatencyNs < latencyUpperNs) {
    133           output.add(span);
    134         }
    135       }
    136     }
    137 
    138     private int getNumSamples() {
    139       return sampledSpansQueue.size() + notSampledSpansQueue.size();
    140     }
    141   }
    142 
    143   /**
    144    * Keeps samples for a given span name. Samples for all the latency buckets and for all canonical
    145    * codes other than OK.
    146    */
    147   private static final class PerSpanNameSamples {
    148 
    149     private final Bucket[] latencyBuckets;
    150     private final Bucket[] errorBuckets;
    151 
    152     private PerSpanNameSamples() {
    153       latencyBuckets = new Bucket[NUM_LATENCY_BUCKETS];
    154       for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) {
    155         latencyBuckets[i] = new Bucket(NUM_SAMPLES_PER_LATENCY_BUCKET);
    156       }
    157       errorBuckets = new Bucket[NUM_ERROR_BUCKETS];
    158       for (int i = 0; i < NUM_ERROR_BUCKETS; i++) {
    159         errorBuckets[i] = new Bucket(NUM_SAMPLES_PER_ERROR_BUCKET);
    160       }
    161     }
    162 
    163     @Nullable
    164     private Bucket getLatencyBucket(long latencyNs) {
    165       for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) {
    166         LatencyBucketBoundaries boundaries = LatencyBucketBoundaries.values()[i];
    167         if (latencyNs >= boundaries.getLatencyLowerNs()
    168             && latencyNs < boundaries.getLatencyUpperNs()) {
    169           return latencyBuckets[i];
    170         }
    171       }
    172       // latencyNs is negative or Long.MAX_VALUE, so this Span can be ignored. This cannot happen
    173       // in real production because System#nanoTime is monotonic.
    174       return null;
    175     }
    176 
    177     private Bucket getErrorBucket(CanonicalCode code) {
    178       return errorBuckets[code.value() - 1];
    179     }
    180 
    181     private void considerForSampling(RecordEventsSpanImpl span) {
    182       Status status = span.getStatus();
    183       // Null status means running Span, this should not happen in production, but the library
    184       // should not crash because of this.
    185       if (status != null) {
    186         Bucket bucket =
    187             status.isOk()
    188                 ? getLatencyBucket(span.getLatencyNs())
    189                 : getErrorBucket(status.getCanonicalCode());
    190         // If unable to find the bucket, ignore this Span.
    191         if (bucket != null) {
    192           bucket.considerForSampling(span);
    193         }
    194       }
    195     }
    196 
    197     private Map<LatencyBucketBoundaries, Integer> getNumbersOfLatencySampledSpans() {
    198       Map<LatencyBucketBoundaries, Integer> latencyBucketSummaries =
    199           new EnumMap<LatencyBucketBoundaries, Integer>(LatencyBucketBoundaries.class);
    200       for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) {
    201         latencyBucketSummaries.put(
    202             LatencyBucketBoundaries.values()[i], latencyBuckets[i].getNumSamples());
    203       }
    204       return latencyBucketSummaries;
    205     }
    206 
    207     private Map<CanonicalCode, Integer> getNumbersOfErrorSampledSpans() {
    208       Map<CanonicalCode, Integer> errorBucketSummaries =
    209           new EnumMap<CanonicalCode, Integer>(CanonicalCode.class);
    210       for (int i = 0; i < NUM_ERROR_BUCKETS; i++) {
    211         errorBucketSummaries.put(CanonicalCode.values()[i + 1], errorBuckets[i].getNumSamples());
    212       }
    213       return errorBucketSummaries;
    214     }
    215 
    216     private List<RecordEventsSpanImpl> getErrorSamples(
    217         @Nullable CanonicalCode code, int maxSpansToReturn) {
    218       ArrayList<RecordEventsSpanImpl> output =
    219           new ArrayList<RecordEventsSpanImpl>(maxSpansToReturn);
    220       if (code != null) {
    221         getErrorBucket(code).getSamples(maxSpansToReturn, output);
    222       } else {
    223         for (int i = 0; i < NUM_ERROR_BUCKETS; i++) {
    224           errorBuckets[i].getSamples(maxSpansToReturn, output);
    225         }
    226       }
    227       return output;
    228     }
    229 
    230     private List<RecordEventsSpanImpl> getLatencySamples(
    231         long latencyLowerNs, long latencyUpperNs, int maxSpansToReturn) {
    232       ArrayList<RecordEventsSpanImpl> output =
    233           new ArrayList<RecordEventsSpanImpl>(maxSpansToReturn);
    234       for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) {
    235         LatencyBucketBoundaries boundaries = LatencyBucketBoundaries.values()[i];
    236         if (latencyUpperNs >= boundaries.getLatencyLowerNs()
    237             && latencyLowerNs < boundaries.getLatencyUpperNs()) {
    238           latencyBuckets[i].getSamplesFilteredByLatency(
    239               latencyLowerNs, latencyUpperNs, maxSpansToReturn, output);
    240         }
    241       }
    242       return output;
    243     }
    244   }
    245 
    246   /** Constructs a new {@code InProcessSampledSpanStoreImpl}. */
    247   InProcessSampledSpanStoreImpl(EventQueue eventQueue) {
    248     samples = new HashMap<String, PerSpanNameSamples>();
    249     this.eventQueue = eventQueue;
    250   }
    251 
    252   @Override
    253   public Summary getSummary() {
    254     Map<String, PerSpanNameSummary> ret = new HashMap<String, PerSpanNameSummary>();
    255     synchronized (samples) {
    256       for (Map.Entry<String, PerSpanNameSamples> it : samples.entrySet()) {
    257         ret.put(
    258             it.getKey(),
    259             PerSpanNameSummary.create(
    260                 it.getValue().getNumbersOfLatencySampledSpans(),
    261                 it.getValue().getNumbersOfErrorSampledSpans()));
    262       }
    263     }
    264     return Summary.create(ret);
    265   }
    266 
    267   @Override
    268   public void considerForSampling(RecordEventsSpanImpl span) {
    269     synchronized (samples) {
    270       String spanName = span.getName();
    271       if (span.getSampleToLocalSpanStore() && !samples.containsKey(spanName)) {
    272         samples.put(spanName, new PerSpanNameSamples());
    273       }
    274       PerSpanNameSamples perSpanNameSamples = samples.get(spanName);
    275       if (perSpanNameSamples != null) {
    276         perSpanNameSamples.considerForSampling(span);
    277       }
    278     }
    279   }
    280 
    281   @Override
    282   public void registerSpanNamesForCollection(Collection<String> spanNames) {
    283     eventQueue.enqueue(new RegisterSpanNameEvent(this, spanNames));
    284   }
    285 
    286   @Override
    287   protected void shutdown() {
    288     eventQueue.shutdown();
    289   }
    290 
    291   private void internaltRegisterSpanNamesForCollection(Collection<String> spanNames) {
    292     synchronized (samples) {
    293       for (String spanName : spanNames) {
    294         if (!samples.containsKey(spanName)) {
    295           samples.put(spanName, new PerSpanNameSamples());
    296         }
    297       }
    298     }
    299   }
    300 
    301   private static final class RegisterSpanNameEvent implements EventQueue.Entry {
    302     private final InProcessSampledSpanStoreImpl sampledSpanStore;
    303     private final Collection<String> spanNames;
    304 
    305     private RegisterSpanNameEvent(
    306         InProcessSampledSpanStoreImpl sampledSpanStore, Collection<String> spanNames) {
    307       this.sampledSpanStore = sampledSpanStore;
    308       this.spanNames = new ArrayList<String>(spanNames);
    309     }
    310 
    311     @Override
    312     public void process() {
    313       sampledSpanStore.internaltRegisterSpanNamesForCollection(spanNames);
    314     }
    315   }
    316 
    317   @Override
    318   public void unregisterSpanNamesForCollection(Collection<String> spanNames) {
    319     eventQueue.enqueue(new UnregisterSpanNameEvent(this, spanNames));
    320   }
    321 
    322   private void internalUnregisterSpanNamesForCollection(Collection<String> spanNames) {
    323     synchronized (samples) {
    324       samples.keySet().removeAll(spanNames);
    325     }
    326   }
    327 
    328   private static final class UnregisterSpanNameEvent implements EventQueue.Entry {
    329     private final InProcessSampledSpanStoreImpl sampledSpanStore;
    330     private final Collection<String> spanNames;
    331 
    332     private UnregisterSpanNameEvent(
    333         InProcessSampledSpanStoreImpl sampledSpanStore, Collection<String> spanNames) {
    334       this.sampledSpanStore = sampledSpanStore;
    335       this.spanNames = new ArrayList<String>(spanNames);
    336     }
    337 
    338     @Override
    339     public void process() {
    340       sampledSpanStore.internalUnregisterSpanNamesForCollection(spanNames);
    341     }
    342   }
    343 
    344   @Override
    345   public Set<String> getRegisteredSpanNamesForCollection() {
    346     synchronized (samples) {
    347       return Collections.unmodifiableSet(new HashSet<String>(samples.keySet()));
    348     }
    349   }
    350 
    351   @Override
    352   public Collection<SpanData> getErrorSampledSpans(ErrorFilter filter) {
    353     int numSpansToReturn =
    354         filter.getMaxSpansToReturn() == 0
    355             ? MAX_PER_SPAN_NAME_SAMPLES
    356             : filter.getMaxSpansToReturn();
    357     List<RecordEventsSpanImpl> spans = Collections.emptyList();
    358     // Try to not keep the lock to much, do the RecordEventsSpanImpl -> SpanData conversion outside
    359     // the lock.
    360     synchronized (samples) {
    361       PerSpanNameSamples perSpanNameSamples = samples.get(filter.getSpanName());
    362       if (perSpanNameSamples != null) {
    363         spans = perSpanNameSamples.getErrorSamples(filter.getCanonicalCode(), numSpansToReturn);
    364       }
    365     }
    366     List<SpanData> ret = new ArrayList<SpanData>(spans.size());
    367     for (RecordEventsSpanImpl span : spans) {
    368       ret.add(span.toSpanData());
    369     }
    370     return Collections.unmodifiableList(ret);
    371   }
    372 
    373   @Override
    374   public Collection<SpanData> getLatencySampledSpans(LatencyFilter filter) {
    375     int numSpansToReturn =
    376         filter.getMaxSpansToReturn() == 0
    377             ? MAX_PER_SPAN_NAME_SAMPLES
    378             : filter.getMaxSpansToReturn();
    379     List<RecordEventsSpanImpl> spans = Collections.emptyList();
    380     // Try to not keep the lock to much, do the RecordEventsSpanImpl -> SpanData conversion outside
    381     // the lock.
    382     synchronized (samples) {
    383       PerSpanNameSamples perSpanNameSamples = samples.get(filter.getSpanName());
    384       if (perSpanNameSamples != null) {
    385         spans =
    386             perSpanNameSamples.getLatencySamples(
    387                 filter.getLatencyLowerNs(), filter.getLatencyUpperNs(), numSpansToReturn);
    388       }
    389     }
    390     List<SpanData> ret = new ArrayList<SpanData>(spans.size());
    391     for (RecordEventsSpanImpl span : spans) {
    392       ret.add(span.toSpanData());
    393     }
    394     return Collections.unmodifiableList(ret);
    395   }
    396 }
    397