1 /* 2 * 3 * Copyright 2015-2016 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include <fstream> 20 #include <memory> 21 #include <sstream> 22 #include <thread> 23 24 #include <gflags/gflags.h> 25 #include <grpc/grpc.h> 26 #include <grpc/support/log.h> 27 #include <grpc/support/time.h> 28 #include <grpcpp/security/server_credentials.h> 29 #include <grpcpp/server.h> 30 #include <grpcpp/server_builder.h> 31 #include <grpcpp/server_context.h> 32 33 #include "src/core/lib/gpr/string.h" 34 #include "src/core/lib/transport/byte_stream.h" 35 #include "src/proto/grpc/testing/empty.pb.h" 36 #include "src/proto/grpc/testing/messages.pb.h" 37 #include "src/proto/grpc/testing/test.grpc.pb.h" 38 #include "test/cpp/interop/server_helper.h" 39 #include "test/cpp/util/test_config.h" 40 41 DEFINE_bool(use_alts, false, 42 "Whether to use alts. Enable alts will disable tls."); 43 DEFINE_bool(use_tls, false, "Whether to use tls."); 44 DEFINE_string(custom_credentials_type, "", "User provided credentials type."); 45 DEFINE_int32(port, 0, "Server port."); 46 DEFINE_int32(max_send_message_size, -1, "The maximum send message size."); 47 48 using grpc::Server; 49 using grpc::ServerBuilder; 50 using grpc::ServerContext; 51 using grpc::ServerCredentials; 52 using grpc::ServerReader; 53 using grpc::ServerReaderWriter; 54 using grpc::ServerWriter; 55 using grpc::SslServerCredentialsOptions; 56 using grpc::Status; 57 using grpc::WriteOptions; 58 using grpc::testing::InteropServerContextInspector; 59 using grpc::testing::Payload; 60 using grpc::testing::SimpleRequest; 61 using grpc::testing::SimpleResponse; 62 using grpc::testing::StreamingInputCallRequest; 63 using grpc::testing::StreamingInputCallResponse; 64 using grpc::testing::StreamingOutputCallRequest; 65 using grpc::testing::StreamingOutputCallResponse; 66 using grpc::testing::TestService; 67 68 const char kEchoInitialMetadataKey[] = "x-grpc-test-echo-initial"; 69 const char kEchoTrailingBinMetadataKey[] = "x-grpc-test-echo-trailing-bin"; 70 const char kEchoUserAgentKey[] = "x-grpc-test-echo-useragent"; 71 72 void MaybeEchoMetadata(ServerContext* context) { 73 const auto& client_metadata = context->client_metadata(); 74 GPR_ASSERT(client_metadata.count(kEchoInitialMetadataKey) <= 1); 75 GPR_ASSERT(client_metadata.count(kEchoTrailingBinMetadataKey) <= 1); 76 77 auto iter = client_metadata.find(kEchoInitialMetadataKey); 78 if (iter != client_metadata.end()) { 79 context->AddInitialMetadata( 80 kEchoInitialMetadataKey, 81 grpc::string(iter->second.begin(), iter->second.end())); 82 } 83 iter = client_metadata.find(kEchoTrailingBinMetadataKey); 84 if (iter != client_metadata.end()) { 85 context->AddTrailingMetadata( 86 kEchoTrailingBinMetadataKey, 87 grpc::string(iter->second.begin(), iter->second.end())); 88 } 89 // Check if client sent a magic key in the header that makes us echo 90 // back the user-agent (for testing purpose) 91 iter = client_metadata.find(kEchoUserAgentKey); 92 if (iter != client_metadata.end()) { 93 iter = client_metadata.find("user-agent"); 94 if (iter != client_metadata.end()) { 95 context->AddInitialMetadata( 96 kEchoUserAgentKey, 97 grpc::string(iter->second.begin(), iter->second.end())); 98 } 99 } 100 } 101 102 bool SetPayload(int size, Payload* payload) { 103 std::unique_ptr<char[]> body(new char[size]()); 104 payload->set_body(body.get(), size); 105 return true; 106 } 107 108 bool CheckExpectedCompression(const ServerContext& context, 109 const bool compression_expected) { 110 const InteropServerContextInspector inspector(context); 111 const grpc_compression_algorithm received_compression = 112 inspector.GetCallCompressionAlgorithm(); 113 114 if (compression_expected) { 115 if (received_compression == GRPC_COMPRESS_NONE) { 116 // Expected some compression, got NONE. This is an error. 117 gpr_log(GPR_ERROR, 118 "Expected compression but got uncompressed request from client."); 119 return false; 120 } 121 if (!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)) { 122 gpr_log(GPR_ERROR, 123 "Failure: Requested compression in a compressable request, but " 124 "compression bit in message flags not set."); 125 return false; 126 } 127 } else { 128 // Didn't expect compression -> make sure the request is uncompressed 129 if (inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS) { 130 gpr_log(GPR_ERROR, 131 "Failure: Didn't requested compression, but compression bit in " 132 "message flags set."); 133 return false; 134 } 135 } 136 return true; 137 } 138 139 class TestServiceImpl : public TestService::Service { 140 public: 141 Status EmptyCall(ServerContext* context, const grpc::testing::Empty* request, 142 grpc::testing::Empty* response) { 143 MaybeEchoMetadata(context); 144 return Status::OK; 145 } 146 147 // Response contains current timestamp. We ignore everything in the request. 148 Status CacheableUnaryCall(ServerContext* context, 149 const SimpleRequest* request, 150 SimpleResponse* response) { 151 gpr_timespec ts = gpr_now(GPR_CLOCK_PRECISE); 152 std::string timestamp = std::to_string((long long unsigned)ts.tv_nsec); 153 response->mutable_payload()->set_body(timestamp.c_str(), timestamp.size()); 154 context->AddInitialMetadata("cache-control", "max-age=60, public"); 155 return Status::OK; 156 } 157 158 Status UnaryCall(ServerContext* context, const SimpleRequest* request, 159 SimpleResponse* response) { 160 MaybeEchoMetadata(context); 161 if (request->has_response_compressed()) { 162 const bool compression_requested = request->response_compressed().value(); 163 gpr_log(GPR_DEBUG, "Request for compression (%s) present for %s", 164 compression_requested ? "enabled" : "disabled", __func__); 165 if (compression_requested) { 166 // Any level would do, let's go for HIGH because we are overachievers. 167 context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH); 168 } else { 169 context->set_compression_level(GRPC_COMPRESS_LEVEL_NONE); 170 } 171 } 172 if (!CheckExpectedCompression(*context, 173 request->expect_compressed().value())) { 174 return Status(grpc::StatusCode::INVALID_ARGUMENT, 175 "Compressed request expectation not met."); 176 } 177 if (request->response_size() > 0) { 178 if (!SetPayload(request->response_size(), response->mutable_payload())) { 179 return Status(grpc::StatusCode::INVALID_ARGUMENT, 180 "Error creating payload."); 181 } 182 } 183 184 if (request->has_response_status()) { 185 return Status( 186 static_cast<grpc::StatusCode>(request->response_status().code()), 187 request->response_status().message()); 188 } 189 190 return Status::OK; 191 } 192 193 Status StreamingOutputCall( 194 ServerContext* context, const StreamingOutputCallRequest* request, 195 ServerWriter<StreamingOutputCallResponse>* writer) { 196 StreamingOutputCallResponse response; 197 bool write_success = true; 198 for (int i = 0; write_success && i < request->response_parameters_size(); 199 i++) { 200 if (!SetPayload(request->response_parameters(i).size(), 201 response.mutable_payload())) { 202 return Status(grpc::StatusCode::INVALID_ARGUMENT, 203 "Error creating payload."); 204 } 205 WriteOptions wopts; 206 if (request->response_parameters(i).has_compressed()) { 207 // Compress by default. Disabled on a per-message basis. 208 context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH); 209 const bool compression_requested = 210 request->response_parameters(i).compressed().value(); 211 gpr_log(GPR_DEBUG, "Request for compression (%s) present for %s", 212 compression_requested ? "enabled" : "disabled", __func__); 213 if (!compression_requested) { 214 wopts.set_no_compression(); 215 } // else, compression is already enabled via the context. 216 } 217 int time_us; 218 if ((time_us = request->response_parameters(i).interval_us()) > 0) { 219 // Sleep before response if needed 220 gpr_timespec sleep_time = 221 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), 222 gpr_time_from_micros(time_us, GPR_TIMESPAN)); 223 gpr_sleep_until(sleep_time); 224 } 225 write_success = writer->Write(response, wopts); 226 } 227 if (write_success) { 228 return Status::OK; 229 } else { 230 return Status(grpc::StatusCode::INTERNAL, "Error writing response."); 231 } 232 } 233 234 Status StreamingInputCall(ServerContext* context, 235 ServerReader<StreamingInputCallRequest>* reader, 236 StreamingInputCallResponse* response) { 237 StreamingInputCallRequest request; 238 int aggregated_payload_size = 0; 239 while (reader->Read(&request)) { 240 if (!CheckExpectedCompression(*context, 241 request.expect_compressed().value())) { 242 return Status(grpc::StatusCode::INVALID_ARGUMENT, 243 "Compressed request expectation not met."); 244 } 245 if (request.has_payload()) { 246 aggregated_payload_size += request.payload().body().size(); 247 } 248 } 249 response->set_aggregated_payload_size(aggregated_payload_size); 250 return Status::OK; 251 } 252 253 Status FullDuplexCall( 254 ServerContext* context, 255 ServerReaderWriter<StreamingOutputCallResponse, 256 StreamingOutputCallRequest>* stream) { 257 MaybeEchoMetadata(context); 258 StreamingOutputCallRequest request; 259 StreamingOutputCallResponse response; 260 bool write_success = true; 261 while (write_success && stream->Read(&request)) { 262 if (request.has_response_status()) { 263 return Status( 264 static_cast<grpc::StatusCode>(request.response_status().code()), 265 request.response_status().message()); 266 } 267 if (request.response_parameters_size() != 0) { 268 response.mutable_payload()->set_type(request.payload().type()); 269 response.mutable_payload()->set_body( 270 grpc::string(request.response_parameters(0).size(), '\0')); 271 int time_us; 272 if ((time_us = request.response_parameters(0).interval_us()) > 0) { 273 // Sleep before response if needed 274 gpr_timespec sleep_time = 275 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), 276 gpr_time_from_micros(time_us, GPR_TIMESPAN)); 277 gpr_sleep_until(sleep_time); 278 } 279 write_success = stream->Write(response); 280 } 281 } 282 if (write_success) { 283 return Status::OK; 284 } else { 285 return Status(grpc::StatusCode::INTERNAL, "Error writing response."); 286 } 287 } 288 289 Status HalfDuplexCall( 290 ServerContext* context, 291 ServerReaderWriter<StreamingOutputCallResponse, 292 StreamingOutputCallRequest>* stream) { 293 std::vector<StreamingOutputCallRequest> requests; 294 StreamingOutputCallRequest request; 295 while (stream->Read(&request)) { 296 requests.push_back(request); 297 } 298 299 StreamingOutputCallResponse response; 300 bool write_success = true; 301 for (unsigned int i = 0; write_success && i < requests.size(); i++) { 302 response.mutable_payload()->set_type(requests[i].payload().type()); 303 if (requests[i].response_parameters_size() == 0) { 304 return Status(grpc::StatusCode::INTERNAL, 305 "Request does not have response parameters."); 306 } 307 response.mutable_payload()->set_body( 308 grpc::string(requests[i].response_parameters(0).size(), '\0')); 309 write_success = stream->Write(response); 310 } 311 if (write_success) { 312 return Status::OK; 313 } else { 314 return Status(grpc::StatusCode::INTERNAL, "Error writing response."); 315 } 316 } 317 }; 318 319 void grpc::testing::interop::RunServer( 320 const std::shared_ptr<ServerCredentials>& creds) { 321 RunServer(creds, FLAGS_port, nullptr, nullptr); 322 } 323 324 void grpc::testing::interop::RunServer( 325 const std::shared_ptr<ServerCredentials>& creds, 326 std::unique_ptr<std::vector<std::unique_ptr<ServerBuilderOption>>> 327 server_options) { 328 RunServer(creds, FLAGS_port, nullptr, std::move(server_options)); 329 } 330 331 void grpc::testing::interop::RunServer( 332 const std::shared_ptr<ServerCredentials>& creds, const int port, 333 ServerStartedCondition* server_started_condition) { 334 RunServer(creds, port, server_started_condition, nullptr); 335 } 336 337 void grpc::testing::interop::RunServer( 338 const std::shared_ptr<ServerCredentials>& creds, const int port, 339 ServerStartedCondition* server_started_condition, 340 std::unique_ptr<std::vector<std::unique_ptr<ServerBuilderOption>>> 341 server_options) { 342 GPR_ASSERT(port != 0); 343 std::ostringstream server_address; 344 server_address << "0.0.0.0:" << port; 345 TestServiceImpl service; 346 347 SimpleRequest request; 348 SimpleResponse response; 349 350 ServerBuilder builder; 351 builder.RegisterService(&service); 352 builder.AddListeningPort(server_address.str(), creds); 353 if (server_options != nullptr) { 354 for (size_t i = 0; i < server_options->size(); i++) { 355 builder.SetOption(std::move((*server_options)[i])); 356 } 357 } 358 if (FLAGS_max_send_message_size >= 0) { 359 builder.SetMaxSendMessageSize(FLAGS_max_send_message_size); 360 } 361 std::unique_ptr<Server> server(builder.BuildAndStart()); 362 gpr_log(GPR_INFO, "Server listening on %s", server_address.str().c_str()); 363 364 // Signal that the server has started. 365 if (server_started_condition) { 366 std::unique_lock<std::mutex> lock(server_started_condition->mutex); 367 server_started_condition->server_started = true; 368 server_started_condition->condition.notify_all(); 369 } 370 371 while (!gpr_atm_no_barrier_load(&g_got_sigint)) { 372 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), 373 gpr_time_from_seconds(5, GPR_TIMESPAN))); 374 } 375 } 376