Home | History | Annotate | Download | only in ocagent
      1 /*
      2  * Copyright 2018, OpenCensus Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package io.opencensus.exporter.trace.ocagent;
     18 
     19 import com.google.common.util.concurrent.MoreExecutors;
     20 import io.grpc.Server;
     21 import io.grpc.ServerBuilder;
     22 import io.grpc.netty.NettyServerBuilder;
     23 import io.grpc.stub.StreamObserver;
     24 import io.opencensus.proto.agent.trace.v1.CurrentLibraryConfig;
     25 import io.opencensus.proto.agent.trace.v1.ExportTraceServiceRequest;
     26 import io.opencensus.proto.agent.trace.v1.ExportTraceServiceResponse;
     27 import io.opencensus.proto.agent.trace.v1.TraceServiceGrpc;
     28 import io.opencensus.proto.agent.trace.v1.UpdatedLibraryConfig;
     29 import io.opencensus.proto.trace.v1.ConstantSampler;
     30 import io.opencensus.proto.trace.v1.TraceConfig;
     31 import java.io.IOException;
     32 import java.net.InetSocketAddress;
     33 import java.util.ArrayList;
     34 import java.util.Collections;
     35 import java.util.List;
     36 import java.util.concurrent.Executor;
     37 import java.util.concurrent.atomic.AtomicReference;
     38 import java.util.logging.Logger;
     39 import javax.annotation.Nullable;
     40 
     41 /** Fake implementation of {@link TraceServiceGrpc}. */
     42 final class FakeOcAgentTraceServiceGrpcImpl extends TraceServiceGrpc.TraceServiceImplBase {
     43 
     44   private static final Logger logger =
     45       Logger.getLogger(FakeOcAgentTraceServiceGrpcImpl.class.getName());
     46 
     47   // Default updatedLibraryConfig uses an always sampler.
     48   private UpdatedLibraryConfig updatedLibraryConfig =
     49       UpdatedLibraryConfig.newBuilder()
     50           .setConfig(
     51               TraceConfig.newBuilder()
     52                   .setConstantSampler(ConstantSampler.newBuilder().setDecision(true).build())
     53                   .build())
     54           .build();
     55 
     56   private final List<CurrentLibraryConfig> currentLibraryConfigs = new ArrayList<>();
     57   private final List<ExportTraceServiceRequest> exportTraceServiceRequests = new ArrayList<>();
     58 
     59   private final AtomicReference<StreamObserver<UpdatedLibraryConfig>> updatedConfigObserverRef =
     60       new AtomicReference<>();
     61 
     62   private final StreamObserver<CurrentLibraryConfig> currentConfigObserver =
     63       new StreamObserver<CurrentLibraryConfig>() {
     64         @Override
     65         public void onNext(CurrentLibraryConfig value) {
     66           currentLibraryConfigs.add(value);
     67           @Nullable
     68           StreamObserver<UpdatedLibraryConfig> updatedConfigObserver =
     69               updatedConfigObserverRef.get();
     70           if (updatedConfigObserver != null) {
     71             updatedConfigObserver.onNext(updatedLibraryConfig);
     72           }
     73         }
     74 
     75         @Override
     76         public void onError(Throwable t) {
     77           logger.warning("Exception thrown for config stream: " + t);
     78         }
     79 
     80         @Override
     81         public void onCompleted() {}
     82       };
     83 
     84   private final StreamObserver<ExportTraceServiceRequest> exportRequestObserver =
     85       new StreamObserver<ExportTraceServiceRequest>() {
     86         @Override
     87         public void onNext(ExportTraceServiceRequest value) {
     88           exportTraceServiceRequests.add(value);
     89         }
     90 
     91         @Override
     92         public void onError(Throwable t) {
     93           logger.warning("Exception thrown for export stream: " + t);
     94         }
     95 
     96         @Override
     97         public void onCompleted() {}
     98       };
     99 
    100   @Override
    101   public StreamObserver<CurrentLibraryConfig> config(
    102       StreamObserver<UpdatedLibraryConfig> updatedLibraryConfigStreamObserver) {
    103     updatedConfigObserverRef.set(updatedLibraryConfigStreamObserver);
    104     return currentConfigObserver;
    105   }
    106 
    107   @Override
    108   public StreamObserver<ExportTraceServiceRequest> export(
    109       StreamObserver<ExportTraceServiceResponse> exportTraceServiceResponseStreamObserver) {
    110     return exportRequestObserver;
    111   }
    112 
    113   // Returns the stored CurrentLibraryConfigs.
    114   List<CurrentLibraryConfig> getCurrentLibraryConfigs() {
    115     return Collections.unmodifiableList(currentLibraryConfigs);
    116   }
    117 
    118   // Returns the stored ExportTraceServiceRequests.
    119   List<ExportTraceServiceRequest> getExportTraceServiceRequests() {
    120     return Collections.unmodifiableList(exportTraceServiceRequests);
    121   }
    122 
    123   // Sets the UpdatedLibraryConfig that will be passed to client.
    124   void setUpdatedLibraryConfig(UpdatedLibraryConfig updatedLibraryConfig) {
    125     this.updatedLibraryConfig = updatedLibraryConfig;
    126   }
    127 
    128   // Gets the UpdatedLibraryConfig that will be passed to client.
    129   UpdatedLibraryConfig getUpdatedLibraryConfig() {
    130     return updatedLibraryConfig;
    131   }
    132 
    133   static void startServer(String endPoint) throws IOException {
    134     ServerBuilder<?> builder = NettyServerBuilder.forAddress(parseEndpoint(endPoint));
    135     Executor executor = MoreExecutors.directExecutor();
    136     builder.executor(executor);
    137     final Server server = builder.addService(new FakeOcAgentTraceServiceGrpcImpl()).build();
    138     server.start();
    139     logger.info("Server started at " + endPoint);
    140 
    141     Runtime.getRuntime()
    142         .addShutdownHook(
    143             new Thread() {
    144               @Override
    145               public void run() {
    146                 server.shutdown();
    147               }
    148             });
    149 
    150     try {
    151       server.awaitTermination();
    152     } catch (InterruptedException e) {
    153       logger.warning("Thread interrupted: " + e.getMessage());
    154       Thread.currentThread().interrupt();
    155     }
    156   }
    157 
    158   private static InetSocketAddress parseEndpoint(String endPoint) {
    159     try {
    160       int colonIndex = endPoint.indexOf(":");
    161       String host = endPoint.substring(0, colonIndex);
    162       int port = Integer.parseInt(endPoint.substring(colonIndex + 1));
    163       return new InetSocketAddress(host, port);
    164     } catch (RuntimeException e) {
    165       logger.warning("Unexpected format of end point: " + endPoint + ", use default end point.");
    166       return new InetSocketAddress("localhost", 55678);
    167     }
    168   }
    169 }
    170