1 /* 2 * Copyright 2018, OpenCensus Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.opencensus.exporter.trace.ocagent; 18 19 import com.google.common.util.concurrent.MoreExecutors; 20 import io.grpc.Server; 21 import io.grpc.ServerBuilder; 22 import io.grpc.netty.NettyServerBuilder; 23 import io.grpc.stub.StreamObserver; 24 import io.opencensus.proto.agent.trace.v1.CurrentLibraryConfig; 25 import io.opencensus.proto.agent.trace.v1.ExportTraceServiceRequest; 26 import io.opencensus.proto.agent.trace.v1.ExportTraceServiceResponse; 27 import io.opencensus.proto.agent.trace.v1.TraceServiceGrpc; 28 import io.opencensus.proto.agent.trace.v1.UpdatedLibraryConfig; 29 import io.opencensus.proto.trace.v1.ConstantSampler; 30 import io.opencensus.proto.trace.v1.TraceConfig; 31 import java.io.IOException; 32 import java.net.InetSocketAddress; 33 import java.util.ArrayList; 34 import java.util.Collections; 35 import java.util.List; 36 import java.util.concurrent.Executor; 37 import java.util.concurrent.atomic.AtomicReference; 38 import java.util.logging.Logger; 39 import javax.annotation.Nullable; 40 41 /** Fake implementation of {@link TraceServiceGrpc}. */ 42 final class FakeOcAgentTraceServiceGrpcImpl extends TraceServiceGrpc.TraceServiceImplBase { 43 44 private static final Logger logger = 45 Logger.getLogger(FakeOcAgentTraceServiceGrpcImpl.class.getName()); 46 47 // Default updatedLibraryConfig uses an always sampler. 48 private UpdatedLibraryConfig updatedLibraryConfig = 49 UpdatedLibraryConfig.newBuilder() 50 .setConfig( 51 TraceConfig.newBuilder() 52 .setConstantSampler(ConstantSampler.newBuilder().setDecision(true).build()) 53 .build()) 54 .build(); 55 56 private final List<CurrentLibraryConfig> currentLibraryConfigs = new ArrayList<>(); 57 private final List<ExportTraceServiceRequest> exportTraceServiceRequests = new ArrayList<>(); 58 59 private final AtomicReference<StreamObserver<UpdatedLibraryConfig>> updatedConfigObserverRef = 60 new AtomicReference<>(); 61 62 private final StreamObserver<CurrentLibraryConfig> currentConfigObserver = 63 new StreamObserver<CurrentLibraryConfig>() { 64 @Override 65 public void onNext(CurrentLibraryConfig value) { 66 currentLibraryConfigs.add(value); 67 @Nullable 68 StreamObserver<UpdatedLibraryConfig> updatedConfigObserver = 69 updatedConfigObserverRef.get(); 70 if (updatedConfigObserver != null) { 71 updatedConfigObserver.onNext(updatedLibraryConfig); 72 } 73 } 74 75 @Override 76 public void onError(Throwable t) { 77 logger.warning("Exception thrown for config stream: " + t); 78 } 79 80 @Override 81 public void onCompleted() {} 82 }; 83 84 private final StreamObserver<ExportTraceServiceRequest> exportRequestObserver = 85 new StreamObserver<ExportTraceServiceRequest>() { 86 @Override 87 public void onNext(ExportTraceServiceRequest value) { 88 exportTraceServiceRequests.add(value); 89 } 90 91 @Override 92 public void onError(Throwable t) { 93 logger.warning("Exception thrown for export stream: " + t); 94 } 95 96 @Override 97 public void onCompleted() {} 98 }; 99 100 @Override 101 public StreamObserver<CurrentLibraryConfig> config( 102 StreamObserver<UpdatedLibraryConfig> updatedLibraryConfigStreamObserver) { 103 updatedConfigObserverRef.set(updatedLibraryConfigStreamObserver); 104 return currentConfigObserver; 105 } 106 107 @Override 108 public StreamObserver<ExportTraceServiceRequest> export( 109 StreamObserver<ExportTraceServiceResponse> exportTraceServiceResponseStreamObserver) { 110 return exportRequestObserver; 111 } 112 113 // Returns the stored CurrentLibraryConfigs. 114 List<CurrentLibraryConfig> getCurrentLibraryConfigs() { 115 return Collections.unmodifiableList(currentLibraryConfigs); 116 } 117 118 // Returns the stored ExportTraceServiceRequests. 119 List<ExportTraceServiceRequest> getExportTraceServiceRequests() { 120 return Collections.unmodifiableList(exportTraceServiceRequests); 121 } 122 123 // Sets the UpdatedLibraryConfig that will be passed to client. 124 void setUpdatedLibraryConfig(UpdatedLibraryConfig updatedLibraryConfig) { 125 this.updatedLibraryConfig = updatedLibraryConfig; 126 } 127 128 // Gets the UpdatedLibraryConfig that will be passed to client. 129 UpdatedLibraryConfig getUpdatedLibraryConfig() { 130 return updatedLibraryConfig; 131 } 132 133 static void startServer(String endPoint) throws IOException { 134 ServerBuilder<?> builder = NettyServerBuilder.forAddress(parseEndpoint(endPoint)); 135 Executor executor = MoreExecutors.directExecutor(); 136 builder.executor(executor); 137 final Server server = builder.addService(new FakeOcAgentTraceServiceGrpcImpl()).build(); 138 server.start(); 139 logger.info("Server started at " + endPoint); 140 141 Runtime.getRuntime() 142 .addShutdownHook( 143 new Thread() { 144 @Override 145 public void run() { 146 server.shutdown(); 147 } 148 }); 149 150 try { 151 server.awaitTermination(); 152 } catch (InterruptedException e) { 153 logger.warning("Thread interrupted: " + e.getMessage()); 154 Thread.currentThread().interrupt(); 155 } 156 } 157 158 private static InetSocketAddress parseEndpoint(String endPoint) { 159 try { 160 int colonIndex = endPoint.indexOf(":"); 161 String host = endPoint.substring(0, colonIndex); 162 int port = Integer.parseInt(endPoint.substring(colonIndex + 1)); 163 return new InetSocketAddress(host, port); 164 } catch (RuntimeException e) { 165 logger.warning("Unexpected format of end point: " + endPoint + ", use default end point."); 166 return new InetSocketAddress("localhost", 55678); 167 } 168 } 169 } 170