1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include <grpcpp/server_context.h> 20 21 #include <algorithm> 22 #include <mutex> 23 #include <utility> 24 25 #include <grpc/compression.h> 26 #include <grpc/grpc.h> 27 #include <grpc/load_reporting.h> 28 #include <grpc/support/alloc.h> 29 #include <grpc/support/log.h> 30 #include <grpcpp/completion_queue.h> 31 #include <grpcpp/impl/call.h> 32 #include <grpcpp/support/time.h> 33 34 #include "src/core/lib/surface/call.h" 35 36 namespace grpc { 37 38 // CompletionOp 39 40 class ServerContext::CompletionOp final : public internal::CallOpSetInterface { 41 public: 42 // initial refs: one in the server context, one in the cq 43 CompletionOp() 44 : has_tag_(false), 45 tag_(nullptr), 46 refs_(2), 47 finalized_(false), 48 cancelled_(0) {} 49 50 void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) override; 51 bool FinalizeResult(void** tag, bool* status) override; 52 53 bool CheckCancelled(CompletionQueue* cq) { 54 cq->TryPluck(this); 55 return CheckCancelledNoPluck(); 56 } 57 bool CheckCancelledAsync() { return CheckCancelledNoPluck(); } 58 59 void set_tag(void* tag) { 60 has_tag_ = true; 61 tag_ = tag; 62 } 63 64 /// TODO(vjpai): Allow override of cq_tag if appropriate for callback API 65 void* cq_tag() override { return this; } 66 67 void Unref(); 68 69 private: 70 bool CheckCancelledNoPluck() { 71 std::lock_guard<std::mutex> g(mu_); 72 return finalized_ ? (cancelled_ != 0) : false; 73 } 74 75 bool has_tag_; 76 void* tag_; 77 std::mutex mu_; 78 int refs_; 79 bool finalized_; 80 int cancelled_; 81 }; 82 83 void ServerContext::CompletionOp::Unref() { 84 std::unique_lock<std::mutex> lock(mu_); 85 if (--refs_ == 0) { 86 lock.unlock(); 87 delete this; 88 } 89 } 90 91 void ServerContext::CompletionOp::FillOps(grpc_call* call, grpc_op* ops, 92 size_t* nops) { 93 ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER; 94 ops->data.recv_close_on_server.cancelled = &cancelled_; 95 ops->flags = 0; 96 ops->reserved = nullptr; 97 *nops = 1; 98 } 99 100 bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { 101 std::unique_lock<std::mutex> lock(mu_); 102 finalized_ = true; 103 bool ret = false; 104 if (has_tag_) { 105 *tag = tag_; 106 ret = true; 107 } 108 if (!*status) cancelled_ = 1; 109 if (--refs_ == 0) { 110 lock.unlock(); 111 delete this; 112 } 113 return ret; 114 } 115 116 // ServerContext body 117 118 ServerContext::ServerContext() 119 : completion_op_(nullptr), 120 has_notify_when_done_tag_(false), 121 async_notify_when_done_tag_(nullptr), 122 deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)), 123 call_(nullptr), 124 cq_(nullptr), 125 sent_initial_metadata_(false), 126 compression_level_set_(false), 127 has_pending_ops_(false) {} 128 129 ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr) 130 : completion_op_(nullptr), 131 has_notify_when_done_tag_(false), 132 async_notify_when_done_tag_(nullptr), 133 deadline_(deadline), 134 call_(nullptr), 135 cq_(nullptr), 136 sent_initial_metadata_(false), 137 compression_level_set_(false), 138 has_pending_ops_(false) { 139 std::swap(*client_metadata_.arr(), *arr); 140 } 141 142 ServerContext::~ServerContext() { 143 if (call_) { 144 grpc_call_unref(call_); 145 } 146 if (completion_op_) { 147 completion_op_->Unref(); 148 } 149 } 150 151 void ServerContext::BeginCompletionOp(internal::Call* call) { 152 GPR_ASSERT(!completion_op_); 153 completion_op_ = new CompletionOp(); 154 if (has_notify_when_done_tag_) { 155 completion_op_->set_tag(async_notify_when_done_tag_); 156 } 157 call->PerformOps(completion_op_); 158 } 159 160 internal::CompletionQueueTag* ServerContext::GetCompletionOpTag() { 161 return static_cast<internal::CompletionQueueTag*>(completion_op_); 162 } 163 164 void ServerContext::AddInitialMetadata(const grpc::string& key, 165 const grpc::string& value) { 166 initial_metadata_.insert(std::make_pair(key, value)); 167 } 168 169 void ServerContext::AddTrailingMetadata(const grpc::string& key, 170 const grpc::string& value) { 171 trailing_metadata_.insert(std::make_pair(key, value)); 172 } 173 174 void ServerContext::TryCancel() const { 175 grpc_call_error err = grpc_call_cancel_with_status( 176 call_, GRPC_STATUS_CANCELLED, "Cancelled on the server side", nullptr); 177 if (err != GRPC_CALL_OK) { 178 gpr_log(GPR_ERROR, "TryCancel failed with: %d", err); 179 } 180 } 181 182 bool ServerContext::IsCancelled() const { 183 if (has_notify_when_done_tag_) { 184 // when using async API, but the result is only valid 185 // if the tag has already been delivered at the completion queue 186 return completion_op_ && completion_op_->CheckCancelledAsync(); 187 } else { 188 // when using sync API 189 return completion_op_ && completion_op_->CheckCancelled(cq_); 190 } 191 } 192 193 void ServerContext::set_compression_algorithm( 194 grpc_compression_algorithm algorithm) { 195 compression_algorithm_ = algorithm; 196 const char* algorithm_name = nullptr; 197 if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) { 198 gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.", 199 algorithm); 200 abort(); 201 } 202 GPR_ASSERT(algorithm_name != nullptr); 203 AddInitialMetadata(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, algorithm_name); 204 } 205 206 grpc::string ServerContext::peer() const { 207 grpc::string peer; 208 if (call_) { 209 char* c_peer = grpc_call_get_peer(call_); 210 peer = c_peer; 211 gpr_free(c_peer); 212 } 213 return peer; 214 } 215 216 const struct census_context* ServerContext::census_context() const { 217 return grpc_census_call_get_context(call_); 218 } 219 220 void ServerContext::SetLoadReportingCosts( 221 const std::vector<grpc::string>& cost_data) { 222 if (call_ == nullptr) return; 223 for (const auto& cost_datum : cost_data) { 224 AddTrailingMetadata(GRPC_LB_COST_MD_KEY, cost_datum); 225 } 226 } 227 228 } // namespace grpc 229