1 /* 2 * Copyright 2018, OpenCensus Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.opencensus.implcore.trace.export; 18 19 import com.google.common.collect.EvictingQueue; 20 import io.opencensus.implcore.internal.EventQueue; 21 import io.opencensus.implcore.trace.RecordEventsSpanImpl; 22 import io.opencensus.trace.Status; 23 import io.opencensus.trace.Status.CanonicalCode; 24 import io.opencensus.trace.export.SampledSpanStore; 25 import io.opencensus.trace.export.SpanData; 26 import java.util.ArrayList; 27 import java.util.Collection; 28 import java.util.Collections; 29 import java.util.EnumMap; 30 import java.util.HashMap; 31 import java.util.HashSet; 32 import java.util.List; 33 import java.util.Map; 34 import java.util.Set; 35 import java.util.concurrent.TimeUnit; 36 import javax.annotation.Nullable; 37 import javax.annotation.concurrent.GuardedBy; 38 import javax.annotation.concurrent.ThreadSafe; 39 40 /** In-process implementation of the {@link SampledSpanStore}. */ 41 @ThreadSafe 42 public final class InProcessSampledSpanStoreImpl extends SampledSpanStoreImpl { 43 private static final int NUM_SAMPLES_PER_LATENCY_BUCKET = 10; 44 private static final int NUM_SAMPLES_PER_ERROR_BUCKET = 5; 45 private static final long TIME_BETWEEN_SAMPLES = TimeUnit.SECONDS.toNanos(1); 46 private static final int NUM_LATENCY_BUCKETS = LatencyBucketBoundaries.values().length; 47 // The total number of canonical codes - 1 (the OK code). 48 private static final int NUM_ERROR_BUCKETS = CanonicalCode.values().length - 1; 49 private static final int MAX_PER_SPAN_NAME_SAMPLES = 50 NUM_SAMPLES_PER_LATENCY_BUCKET * NUM_LATENCY_BUCKETS 51 + NUM_SAMPLES_PER_ERROR_BUCKET * NUM_ERROR_BUCKETS; 52 53 // Used to stream the register/unregister events to the implementation to avoid lock contention 54 // between the main threads and the worker thread. 55 private final EventQueue eventQueue; 56 57 @GuardedBy("samples") 58 private final Map<String, PerSpanNameSamples> samples; 59 60 private static final class Bucket { 61 62 private final EvictingQueue<RecordEventsSpanImpl> sampledSpansQueue; 63 private final EvictingQueue<RecordEventsSpanImpl> notSampledSpansQueue; 64 private long lastSampledNanoTime; 65 private long lastNotSampledNanoTime; 66 67 private Bucket(int numSamples) { 68 sampledSpansQueue = EvictingQueue.create(numSamples); 69 notSampledSpansQueue = EvictingQueue.create(numSamples); 70 } 71 72 private void considerForSampling(RecordEventsSpanImpl span) { 73 long spanEndNanoTime = span.getEndNanoTime(); 74 if (span.getContext().getTraceOptions().isSampled()) { 75 // Need to compare by doing the subtraction all the time because in case of an overflow, 76 // this may never sample again (at least for the next ~200 years). No real chance to 77 // overflow two times because that means the process runs for ~200 years. 78 if (spanEndNanoTime - lastSampledNanoTime > TIME_BETWEEN_SAMPLES) { 79 sampledSpansQueue.add(span); 80 lastSampledNanoTime = spanEndNanoTime; 81 } 82 } else { 83 // Need to compare by doing the subtraction all the time because in case of an overflow, 84 // this may never sample again (at least for the next ~200 years). No real chance to 85 // overflow two times because that means the process runs for ~200 years. 86 if (spanEndNanoTime - lastNotSampledNanoTime > TIME_BETWEEN_SAMPLES) { 87 notSampledSpansQueue.add(span); 88 lastNotSampledNanoTime = spanEndNanoTime; 89 } 90 } 91 } 92 93 private void getSamples(int maxSpansToReturn, List<RecordEventsSpanImpl> output) { 94 getSamples(maxSpansToReturn, output, sampledSpansQueue); 95 getSamples(maxSpansToReturn, output, notSampledSpansQueue); 96 } 97 98 private static void getSamples( 99 int maxSpansToReturn, 100 List<RecordEventsSpanImpl> output, 101 EvictingQueue<RecordEventsSpanImpl> queue) { 102 for (RecordEventsSpanImpl span : queue) { 103 if (output.size() >= maxSpansToReturn) { 104 break; 105 } 106 output.add(span); 107 } 108 } 109 110 private void getSamplesFilteredByLatency( 111 long latencyLowerNs, 112 long latencyUpperNs, 113 int maxSpansToReturn, 114 List<RecordEventsSpanImpl> output) { 115 getSamplesFilteredByLatency( 116 latencyLowerNs, latencyUpperNs, maxSpansToReturn, output, sampledSpansQueue); 117 getSamplesFilteredByLatency( 118 latencyLowerNs, latencyUpperNs, maxSpansToReturn, output, notSampledSpansQueue); 119 } 120 121 private static void getSamplesFilteredByLatency( 122 long latencyLowerNs, 123 long latencyUpperNs, 124 int maxSpansToReturn, 125 List<RecordEventsSpanImpl> output, 126 EvictingQueue<RecordEventsSpanImpl> queue) { 127 for (RecordEventsSpanImpl span : queue) { 128 if (output.size() >= maxSpansToReturn) { 129 break; 130 } 131 long spanLatencyNs = span.getLatencyNs(); 132 if (spanLatencyNs >= latencyLowerNs && spanLatencyNs < latencyUpperNs) { 133 output.add(span); 134 } 135 } 136 } 137 138 private int getNumSamples() { 139 return sampledSpansQueue.size() + notSampledSpansQueue.size(); 140 } 141 } 142 143 /** 144 * Keeps samples for a given span name. Samples for all the latency buckets and for all canonical 145 * codes other than OK. 146 */ 147 private static final class PerSpanNameSamples { 148 149 private final Bucket[] latencyBuckets; 150 private final Bucket[] errorBuckets; 151 152 private PerSpanNameSamples() { 153 latencyBuckets = new Bucket[NUM_LATENCY_BUCKETS]; 154 for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) { 155 latencyBuckets[i] = new Bucket(NUM_SAMPLES_PER_LATENCY_BUCKET); 156 } 157 errorBuckets = new Bucket[NUM_ERROR_BUCKETS]; 158 for (int i = 0; i < NUM_ERROR_BUCKETS; i++) { 159 errorBuckets[i] = new Bucket(NUM_SAMPLES_PER_ERROR_BUCKET); 160 } 161 } 162 163 @Nullable 164 private Bucket getLatencyBucket(long latencyNs) { 165 for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) { 166 LatencyBucketBoundaries boundaries = LatencyBucketBoundaries.values()[i]; 167 if (latencyNs >= boundaries.getLatencyLowerNs() 168 && latencyNs < boundaries.getLatencyUpperNs()) { 169 return latencyBuckets[i]; 170 } 171 } 172 // latencyNs is negative or Long.MAX_VALUE, so this Span can be ignored. This cannot happen 173 // in real production because System#nanoTime is monotonic. 174 return null; 175 } 176 177 private Bucket getErrorBucket(CanonicalCode code) { 178 return errorBuckets[code.value() - 1]; 179 } 180 181 private void considerForSampling(RecordEventsSpanImpl span) { 182 Status status = span.getStatus(); 183 // Null status means running Span, this should not happen in production, but the library 184 // should not crash because of this. 185 if (status != null) { 186 Bucket bucket = 187 status.isOk() 188 ? getLatencyBucket(span.getLatencyNs()) 189 : getErrorBucket(status.getCanonicalCode()); 190 // If unable to find the bucket, ignore this Span. 191 if (bucket != null) { 192 bucket.considerForSampling(span); 193 } 194 } 195 } 196 197 private Map<LatencyBucketBoundaries, Integer> getNumbersOfLatencySampledSpans() { 198 Map<LatencyBucketBoundaries, Integer> latencyBucketSummaries = 199 new EnumMap<LatencyBucketBoundaries, Integer>(LatencyBucketBoundaries.class); 200 for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) { 201 latencyBucketSummaries.put( 202 LatencyBucketBoundaries.values()[i], latencyBuckets[i].getNumSamples()); 203 } 204 return latencyBucketSummaries; 205 } 206 207 private Map<CanonicalCode, Integer> getNumbersOfErrorSampledSpans() { 208 Map<CanonicalCode, Integer> errorBucketSummaries = 209 new EnumMap<CanonicalCode, Integer>(CanonicalCode.class); 210 for (int i = 0; i < NUM_ERROR_BUCKETS; i++) { 211 errorBucketSummaries.put(CanonicalCode.values()[i + 1], errorBuckets[i].getNumSamples()); 212 } 213 return errorBucketSummaries; 214 } 215 216 private List<RecordEventsSpanImpl> getErrorSamples( 217 @Nullable CanonicalCode code, int maxSpansToReturn) { 218 ArrayList<RecordEventsSpanImpl> output = 219 new ArrayList<RecordEventsSpanImpl>(maxSpansToReturn); 220 if (code != null) { 221 getErrorBucket(code).getSamples(maxSpansToReturn, output); 222 } else { 223 for (int i = 0; i < NUM_ERROR_BUCKETS; i++) { 224 errorBuckets[i].getSamples(maxSpansToReturn, output); 225 } 226 } 227 return output; 228 } 229 230 private List<RecordEventsSpanImpl> getLatencySamples( 231 long latencyLowerNs, long latencyUpperNs, int maxSpansToReturn) { 232 ArrayList<RecordEventsSpanImpl> output = 233 new ArrayList<RecordEventsSpanImpl>(maxSpansToReturn); 234 for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) { 235 LatencyBucketBoundaries boundaries = LatencyBucketBoundaries.values()[i]; 236 if (latencyUpperNs >= boundaries.getLatencyLowerNs() 237 && latencyLowerNs < boundaries.getLatencyUpperNs()) { 238 latencyBuckets[i].getSamplesFilteredByLatency( 239 latencyLowerNs, latencyUpperNs, maxSpansToReturn, output); 240 } 241 } 242 return output; 243 } 244 } 245 246 /** Constructs a new {@code InProcessSampledSpanStoreImpl}. */ 247 InProcessSampledSpanStoreImpl(EventQueue eventQueue) { 248 samples = new HashMap<String, PerSpanNameSamples>(); 249 this.eventQueue = eventQueue; 250 } 251 252 @Override 253 public Summary getSummary() { 254 Map<String, PerSpanNameSummary> ret = new HashMap<String, PerSpanNameSummary>(); 255 synchronized (samples) { 256 for (Map.Entry<String, PerSpanNameSamples> it : samples.entrySet()) { 257 ret.put( 258 it.getKey(), 259 PerSpanNameSummary.create( 260 it.getValue().getNumbersOfLatencySampledSpans(), 261 it.getValue().getNumbersOfErrorSampledSpans())); 262 } 263 } 264 return Summary.create(ret); 265 } 266 267 @Override 268 public void considerForSampling(RecordEventsSpanImpl span) { 269 synchronized (samples) { 270 String spanName = span.getName(); 271 if (span.getSampleToLocalSpanStore() && !samples.containsKey(spanName)) { 272 samples.put(spanName, new PerSpanNameSamples()); 273 } 274 PerSpanNameSamples perSpanNameSamples = samples.get(spanName); 275 if (perSpanNameSamples != null) { 276 perSpanNameSamples.considerForSampling(span); 277 } 278 } 279 } 280 281 @Override 282 public void registerSpanNamesForCollection(Collection<String> spanNames) { 283 eventQueue.enqueue(new RegisterSpanNameEvent(this, spanNames)); 284 } 285 286 @Override 287 protected void shutdown() { 288 eventQueue.shutdown(); 289 } 290 291 private void internaltRegisterSpanNamesForCollection(Collection<String> spanNames) { 292 synchronized (samples) { 293 for (String spanName : spanNames) { 294 if (!samples.containsKey(spanName)) { 295 samples.put(spanName, new PerSpanNameSamples()); 296 } 297 } 298 } 299 } 300 301 private static final class RegisterSpanNameEvent implements EventQueue.Entry { 302 private final InProcessSampledSpanStoreImpl sampledSpanStore; 303 private final Collection<String> spanNames; 304 305 private RegisterSpanNameEvent( 306 InProcessSampledSpanStoreImpl sampledSpanStore, Collection<String> spanNames) { 307 this.sampledSpanStore = sampledSpanStore; 308 this.spanNames = new ArrayList<String>(spanNames); 309 } 310 311 @Override 312 public void process() { 313 sampledSpanStore.internaltRegisterSpanNamesForCollection(spanNames); 314 } 315 } 316 317 @Override 318 public void unregisterSpanNamesForCollection(Collection<String> spanNames) { 319 eventQueue.enqueue(new UnregisterSpanNameEvent(this, spanNames)); 320 } 321 322 private void internalUnregisterSpanNamesForCollection(Collection<String> spanNames) { 323 synchronized (samples) { 324 samples.keySet().removeAll(spanNames); 325 } 326 } 327 328 private static final class UnregisterSpanNameEvent implements EventQueue.Entry { 329 private final InProcessSampledSpanStoreImpl sampledSpanStore; 330 private final Collection<String> spanNames; 331 332 private UnregisterSpanNameEvent( 333 InProcessSampledSpanStoreImpl sampledSpanStore, Collection<String> spanNames) { 334 this.sampledSpanStore = sampledSpanStore; 335 this.spanNames = new ArrayList<String>(spanNames); 336 } 337 338 @Override 339 public void process() { 340 sampledSpanStore.internalUnregisterSpanNamesForCollection(spanNames); 341 } 342 } 343 344 @Override 345 public Set<String> getRegisteredSpanNamesForCollection() { 346 synchronized (samples) { 347 return Collections.unmodifiableSet(new HashSet<String>(samples.keySet())); 348 } 349 } 350 351 @Override 352 public Collection<SpanData> getErrorSampledSpans(ErrorFilter filter) { 353 int numSpansToReturn = 354 filter.getMaxSpansToReturn() == 0 355 ? MAX_PER_SPAN_NAME_SAMPLES 356 : filter.getMaxSpansToReturn(); 357 List<RecordEventsSpanImpl> spans = Collections.emptyList(); 358 // Try to not keep the lock to much, do the RecordEventsSpanImpl -> SpanData conversion outside 359 // the lock. 360 synchronized (samples) { 361 PerSpanNameSamples perSpanNameSamples = samples.get(filter.getSpanName()); 362 if (perSpanNameSamples != null) { 363 spans = perSpanNameSamples.getErrorSamples(filter.getCanonicalCode(), numSpansToReturn); 364 } 365 } 366 List<SpanData> ret = new ArrayList<SpanData>(spans.size()); 367 for (RecordEventsSpanImpl span : spans) { 368 ret.add(span.toSpanData()); 369 } 370 return Collections.unmodifiableList(ret); 371 } 372 373 @Override 374 public Collection<SpanData> getLatencySampledSpans(LatencyFilter filter) { 375 int numSpansToReturn = 376 filter.getMaxSpansToReturn() == 0 377 ? MAX_PER_SPAN_NAME_SAMPLES 378 : filter.getMaxSpansToReturn(); 379 List<RecordEventsSpanImpl> spans = Collections.emptyList(); 380 // Try to not keep the lock to much, do the RecordEventsSpanImpl -> SpanData conversion outside 381 // the lock. 382 synchronized (samples) { 383 PerSpanNameSamples perSpanNameSamples = samples.get(filter.getSpanName()); 384 if (perSpanNameSamples != null) { 385 spans = 386 perSpanNameSamples.getLatencySamples( 387 filter.getLatencyLowerNs(), filter.getLatencyUpperNs(), numSpansToReturn); 388 } 389 } 390 List<SpanData> ret = new ArrayList<SpanData>(spans.size()); 391 for (RecordEventsSpanImpl span : spans) { 392 ret.add(span.toSpanData()); 393 } 394 return Collections.unmodifiableList(ret); 395 } 396 } 397