Home | History | Annotate | Download | only in server
      1 /*
      2  *
      3  * Copyright 2015 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include <grpcpp/server_context.h>
     20 
     21 #include <algorithm>
     22 #include <mutex>
     23 #include <utility>
     24 
     25 #include <grpc/compression.h>
     26 #include <grpc/grpc.h>
     27 #include <grpc/load_reporting.h>
     28 #include <grpc/support/alloc.h>
     29 #include <grpc/support/log.h>
     30 #include <grpcpp/completion_queue.h>
     31 #include <grpcpp/impl/call.h>
     32 #include <grpcpp/support/time.h>
     33 
     34 #include "src/core/lib/surface/call.h"
     35 
     36 namespace grpc {
     37 
     38 // CompletionOp
     39 
     40 class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
     41  public:
     42   // initial refs: one in the server context, one in the cq
     43   CompletionOp()
     44       : has_tag_(false),
     45         tag_(nullptr),
     46         refs_(2),
     47         finalized_(false),
     48         cancelled_(0) {}
     49 
     50   void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) override;
     51   bool FinalizeResult(void** tag, bool* status) override;
     52 
     53   bool CheckCancelled(CompletionQueue* cq) {
     54     cq->TryPluck(this);
     55     return CheckCancelledNoPluck();
     56   }
     57   bool CheckCancelledAsync() { return CheckCancelledNoPluck(); }
     58 
     59   void set_tag(void* tag) {
     60     has_tag_ = true;
     61     tag_ = tag;
     62   }
     63 
     64   /// TODO(vjpai): Allow override of cq_tag if appropriate for callback API
     65   void* cq_tag() override { return this; }
     66 
     67   void Unref();
     68 
     69  private:
     70   bool CheckCancelledNoPluck() {
     71     std::lock_guard<std::mutex> g(mu_);
     72     return finalized_ ? (cancelled_ != 0) : false;
     73   }
     74 
     75   bool has_tag_;
     76   void* tag_;
     77   std::mutex mu_;
     78   int refs_;
     79   bool finalized_;
     80   int cancelled_;
     81 };
     82 
     83 void ServerContext::CompletionOp::Unref() {
     84   std::unique_lock<std::mutex> lock(mu_);
     85   if (--refs_ == 0) {
     86     lock.unlock();
     87     delete this;
     88   }
     89 }
     90 
     91 void ServerContext::CompletionOp::FillOps(grpc_call* call, grpc_op* ops,
     92                                           size_t* nops) {
     93   ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
     94   ops->data.recv_close_on_server.cancelled = &cancelled_;
     95   ops->flags = 0;
     96   ops->reserved = nullptr;
     97   *nops = 1;
     98 }
     99 
    100 bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
    101   std::unique_lock<std::mutex> lock(mu_);
    102   finalized_ = true;
    103   bool ret = false;
    104   if (has_tag_) {
    105     *tag = tag_;
    106     ret = true;
    107   }
    108   if (!*status) cancelled_ = 1;
    109   if (--refs_ == 0) {
    110     lock.unlock();
    111     delete this;
    112   }
    113   return ret;
    114 }
    115 
    116 // ServerContext body
    117 
    118 ServerContext::ServerContext()
    119     : completion_op_(nullptr),
    120       has_notify_when_done_tag_(false),
    121       async_notify_when_done_tag_(nullptr),
    122       deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)),
    123       call_(nullptr),
    124       cq_(nullptr),
    125       sent_initial_metadata_(false),
    126       compression_level_set_(false),
    127       has_pending_ops_(false) {}
    128 
    129 ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr)
    130     : completion_op_(nullptr),
    131       has_notify_when_done_tag_(false),
    132       async_notify_when_done_tag_(nullptr),
    133       deadline_(deadline),
    134       call_(nullptr),
    135       cq_(nullptr),
    136       sent_initial_metadata_(false),
    137       compression_level_set_(false),
    138       has_pending_ops_(false) {
    139   std::swap(*client_metadata_.arr(), *arr);
    140 }
    141 
    142 ServerContext::~ServerContext() {
    143   if (call_) {
    144     grpc_call_unref(call_);
    145   }
    146   if (completion_op_) {
    147     completion_op_->Unref();
    148   }
    149 }
    150 
    151 void ServerContext::BeginCompletionOp(internal::Call* call) {
    152   GPR_ASSERT(!completion_op_);
    153   completion_op_ = new CompletionOp();
    154   if (has_notify_when_done_tag_) {
    155     completion_op_->set_tag(async_notify_when_done_tag_);
    156   }
    157   call->PerformOps(completion_op_);
    158 }
    159 
    160 internal::CompletionQueueTag* ServerContext::GetCompletionOpTag() {
    161   return static_cast<internal::CompletionQueueTag*>(completion_op_);
    162 }
    163 
    164 void ServerContext::AddInitialMetadata(const grpc::string& key,
    165                                        const grpc::string& value) {
    166   initial_metadata_.insert(std::make_pair(key, value));
    167 }
    168 
    169 void ServerContext::AddTrailingMetadata(const grpc::string& key,
    170                                         const grpc::string& value) {
    171   trailing_metadata_.insert(std::make_pair(key, value));
    172 }
    173 
    174 void ServerContext::TryCancel() const {
    175   grpc_call_error err = grpc_call_cancel_with_status(
    176       call_, GRPC_STATUS_CANCELLED, "Cancelled on the server side", nullptr);
    177   if (err != GRPC_CALL_OK) {
    178     gpr_log(GPR_ERROR, "TryCancel failed with: %d", err);
    179   }
    180 }
    181 
    182 bool ServerContext::IsCancelled() const {
    183   if (has_notify_when_done_tag_) {
    184     // when using async API, but the result is only valid
    185     // if the tag has already been delivered at the completion queue
    186     return completion_op_ && completion_op_->CheckCancelledAsync();
    187   } else {
    188     // when using sync API
    189     return completion_op_ && completion_op_->CheckCancelled(cq_);
    190   }
    191 }
    192 
    193 void ServerContext::set_compression_algorithm(
    194     grpc_compression_algorithm algorithm) {
    195   compression_algorithm_ = algorithm;
    196   const char* algorithm_name = nullptr;
    197   if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) {
    198     gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.",
    199             algorithm);
    200     abort();
    201   }
    202   GPR_ASSERT(algorithm_name != nullptr);
    203   AddInitialMetadata(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, algorithm_name);
    204 }
    205 
    206 grpc::string ServerContext::peer() const {
    207   grpc::string peer;
    208   if (call_) {
    209     char* c_peer = grpc_call_get_peer(call_);
    210     peer = c_peer;
    211     gpr_free(c_peer);
    212   }
    213   return peer;
    214 }
    215 
    216 const struct census_context* ServerContext::census_context() const {
    217   return grpc_census_call_get_context(call_);
    218 }
    219 
    220 void ServerContext::SetLoadReportingCosts(
    221     const std::vector<grpc::string>& cost_data) {
    222   if (call_ == nullptr) return;
    223   for (const auto& cost_datum : cost_data) {
    224     AddTrailingMetadata(GRPC_LB_COST_MD_KEY, cost_datum);
    225   }
    226 }
    227 
    228 }  // namespace grpc
    229