Home | History | Annotate | Download | only in census
      1 /*
      2  *
      3  * Copyright 2018 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include <grpc/support/port_platform.h>
     20 
     21 #include "src/cpp/ext/filters/census/server_filter.h"
     22 
     23 #include "absl/strings/str_cat.h"
     24 #include "absl/strings/string_view.h"
     25 #include "absl/time/clock.h"
     26 #include "absl/time/time.h"
     27 #include "opencensus/stats/stats.h"
     28 #include "src/core/lib/surface/call.h"
     29 #include "src/cpp/ext/filters/census/grpc_plugin.h"
     30 #include "src/cpp/ext/filters/census/measures.h"
     31 
     32 namespace grpc {
     33 
     34 constexpr uint32_t CensusServerCallData::kMaxServerStatsLen;
     35 
     36 namespace {
     37 
     38 // server metadata elements
     39 struct ServerMetadataElements {
     40   grpc_slice path;
     41   grpc_slice tracing_slice;
     42   grpc_slice census_proto;
     43 };
     44 
     45 void FilterInitialMetadata(grpc_metadata_batch* b,
     46                            ServerMetadataElements* sml) {
     47   if (b->idx.named.path != nullptr) {
     48     sml->path = grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.path->md));
     49   }
     50   if (b->idx.named.grpc_trace_bin != nullptr) {
     51     sml->tracing_slice =
     52         grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_trace_bin->md));
     53     grpc_metadata_batch_remove(b, b->idx.named.grpc_trace_bin);
     54   }
     55   if (b->idx.named.grpc_tags_bin != nullptr) {
     56     sml->census_proto =
     57         grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_tags_bin->md));
     58     grpc_metadata_batch_remove(b, b->idx.named.grpc_tags_bin);
     59   }
     60 }
     61 
     62 }  // namespace
     63 
     64 void CensusServerCallData::OnDoneRecvMessageCb(void* user_data,
     65                                                grpc_error* error) {
     66   grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
     67   CensusServerCallData* calld =
     68       reinterpret_cast<CensusServerCallData*>(elem->call_data);
     69   CensusChannelData* channeld =
     70       reinterpret_cast<CensusChannelData*>(elem->channel_data);
     71   GPR_ASSERT(calld != nullptr);
     72   GPR_ASSERT(channeld != nullptr);
     73   // Stream messages are no longer valid after receiving trailing metadata.
     74   if ((*calld->recv_message_) != nullptr) {
     75     ++calld->recv_message_count_;
     76   }
     77   GRPC_CLOSURE_RUN(calld->initial_on_done_recv_message_, GRPC_ERROR_REF(error));
     78 }
     79 
     80 void CensusServerCallData::OnDoneRecvInitialMetadataCb(void* user_data,
     81                                                        grpc_error* error) {
     82   grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
     83   CensusServerCallData* calld =
     84       reinterpret_cast<CensusServerCallData*>(elem->call_data);
     85   GPR_ASSERT(calld != nullptr);
     86   if (error == GRPC_ERROR_NONE) {
     87     grpc_metadata_batch* initial_metadata = calld->recv_initial_metadata_;
     88     GPR_ASSERT(initial_metadata != nullptr);
     89     ServerMetadataElements sml;
     90     sml.path = grpc_empty_slice();
     91     sml.tracing_slice = grpc_empty_slice();
     92     sml.census_proto = grpc_empty_slice();
     93     FilterInitialMetadata(initial_metadata, &sml);
     94     calld->path_ = grpc_slice_ref_internal(sml.path);
     95     calld->method_ = GetMethod(&calld->path_);
     96     calld->qualified_method_ = absl::StrCat("Recv.", calld->method_);
     97     const char* tracing_str =
     98         GRPC_SLICE_IS_EMPTY(sml.tracing_slice)
     99             ? ""
    100             : reinterpret_cast<const char*>(
    101                   GRPC_SLICE_START_PTR(sml.tracing_slice));
    102     size_t tracing_str_len = GRPC_SLICE_IS_EMPTY(sml.tracing_slice)
    103                                  ? 0
    104                                  : GRPC_SLICE_LENGTH(sml.tracing_slice);
    105     const char* census_str = GRPC_SLICE_IS_EMPTY(sml.census_proto)
    106                                  ? ""
    107                                  : reinterpret_cast<const char*>(
    108                                        GRPC_SLICE_START_PTR(sml.census_proto));
    109     size_t census_str_len = GRPC_SLICE_IS_EMPTY(sml.census_proto)
    110                                 ? 0
    111                                 : GRPC_SLICE_LENGTH(sml.census_proto);
    112 
    113     GenerateServerContext(absl::string_view(tracing_str, tracing_str_len),
    114                           absl::string_view(census_str, census_str_len),
    115                           /*primary_role*/ "", calld->qualified_method_,
    116                           &calld->context_);
    117 
    118     grpc_slice_unref_internal(sml.tracing_slice);
    119     grpc_slice_unref_internal(sml.census_proto);
    120     grpc_slice_unref_internal(sml.path);
    121     grpc_census_call_set_context(
    122         calld->gc_, reinterpret_cast<census_context*>(&calld->context_));
    123   }
    124   GRPC_CLOSURE_RUN(calld->initial_on_done_recv_initial_metadata_,
    125                    GRPC_ERROR_REF(error));
    126 }
    127 
    128 void CensusServerCallData::StartTransportStreamOpBatch(
    129     grpc_call_element* elem, TransportStreamOpBatch* op) {
    130   if (op->recv_initial_metadata() != nullptr) {
    131     // substitute our callback for the op callback
    132     recv_initial_metadata_ = op->recv_initial_metadata()->batch();
    133     initial_on_done_recv_initial_metadata_ = op->recv_initial_metadata_ready();
    134     op->set_recv_initial_metadata_ready(&on_done_recv_initial_metadata_);
    135   }
    136   if (op->send_message() != nullptr) {
    137     ++sent_message_count_;
    138   }
    139   if (op->recv_message() != nullptr) {
    140     recv_message_ = op->op()->payload->recv_message.recv_message;
    141     initial_on_done_recv_message_ =
    142         op->op()->payload->recv_message.recv_message_ready;
    143     op->op()->payload->recv_message.recv_message_ready = &on_done_recv_message_;
    144   }
    145   // We need to record the time when the trailing metadata was sent to mark the
    146   // completeness of the request.
    147   if (op->send_trailing_metadata() != nullptr) {
    148     elapsed_time_ = absl::Now() - start_time_;
    149     size_t len = ServerStatsSerialize(absl::ToInt64Nanoseconds(elapsed_time_),
    150                                       stats_buf_, kMaxServerStatsLen);
    151     if (len > 0) {
    152       GRPC_LOG_IF_ERROR(
    153           "census grpc_filter",
    154           grpc_metadata_batch_add_tail(
    155               op->send_trailing_metadata()->batch(), &census_bin_,
    156               grpc_mdelem_from_slices(
    157                   GRPC_MDSTR_GRPC_SERVER_STATS_BIN,
    158                   grpc_slice_from_copied_buffer(stats_buf_, len))));
    159     }
    160   }
    161   // Call next op.
    162   grpc_call_next_op(elem, op->op());
    163 }
    164 
    165 grpc_error* CensusServerCallData::Init(grpc_call_element* elem,
    166                                        const grpc_call_element_args* args) {
    167   start_time_ = absl::Now();
    168   gc_ =
    169       grpc_call_from_top_element(grpc_call_stack_element(args->call_stack, 0));
    170   GRPC_CLOSURE_INIT(&on_done_recv_initial_metadata_,
    171                     OnDoneRecvInitialMetadataCb, elem,
    172                     grpc_schedule_on_exec_ctx);
    173   GRPC_CLOSURE_INIT(&on_done_recv_message_, OnDoneRecvMessageCb, elem,
    174                     grpc_schedule_on_exec_ctx);
    175   auth_context_ = grpc_call_auth_context(gc_);
    176   return GRPC_ERROR_NONE;
    177 }
    178 
    179 void CensusServerCallData::Destroy(grpc_call_element* elem,
    180                                    const grpc_call_final_info* final_info,
    181                                    grpc_closure* then_call_closure) {
    182   const uint64_t request_size = GetOutgoingDataSize(final_info);
    183   const uint64_t response_size = GetIncomingDataSize(final_info);
    184   double elapsed_time_ms = absl::ToDoubleMilliseconds(elapsed_time_);
    185   grpc_auth_context_release(auth_context_);
    186   ::opencensus::stats::Record(
    187       {{RpcServerSentBytesPerRpc(), static_cast<double>(response_size)},
    188        {RpcServerReceivedBytesPerRpc(), static_cast<double>(request_size)},
    189        {RpcServerServerLatency(), elapsed_time_ms},
    190        {RpcServerSentMessagesPerRpc(), sent_message_count_},
    191        {RpcServerReceivedMessagesPerRpc(), recv_message_count_}},
    192       {{ServerMethodTagKey(), method_},
    193        {ServerStatusTagKey(), StatusCodeToString(final_info->final_status)}});
    194   grpc_slice_unref_internal(path_);
    195   context_.EndSpan();
    196 }
    197 
    198 }  // namespace grpc
    199