1 /* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include <grpc/support/port_platform.h> 20 21 #include "src/cpp/ext/filters/census/server_filter.h" 22 23 #include "absl/strings/str_cat.h" 24 #include "absl/strings/string_view.h" 25 #include "absl/time/clock.h" 26 #include "absl/time/time.h" 27 #include "opencensus/stats/stats.h" 28 #include "src/core/lib/surface/call.h" 29 #include "src/cpp/ext/filters/census/grpc_plugin.h" 30 #include "src/cpp/ext/filters/census/measures.h" 31 32 namespace grpc { 33 34 constexpr uint32_t CensusServerCallData::kMaxServerStatsLen; 35 36 namespace { 37 38 // server metadata elements 39 struct ServerMetadataElements { 40 grpc_slice path; 41 grpc_slice tracing_slice; 42 grpc_slice census_proto; 43 }; 44 45 void FilterInitialMetadata(grpc_metadata_batch* b, 46 ServerMetadataElements* sml) { 47 if (b->idx.named.path != nullptr) { 48 sml->path = grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.path->md)); 49 } 50 if (b->idx.named.grpc_trace_bin != nullptr) { 51 sml->tracing_slice = 52 grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_trace_bin->md)); 53 grpc_metadata_batch_remove(b, b->idx.named.grpc_trace_bin); 54 } 55 if (b->idx.named.grpc_tags_bin != nullptr) { 56 sml->census_proto = 57 grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_tags_bin->md)); 58 grpc_metadata_batch_remove(b, b->idx.named.grpc_tags_bin); 59 } 60 } 61 62 } // namespace 63 64 void CensusServerCallData::OnDoneRecvMessageCb(void* user_data, 65 grpc_error* error) { 66 grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data); 67 CensusServerCallData* calld = 68 reinterpret_cast<CensusServerCallData*>(elem->call_data); 69 CensusChannelData* channeld = 70 reinterpret_cast<CensusChannelData*>(elem->channel_data); 71 GPR_ASSERT(calld != nullptr); 72 GPR_ASSERT(channeld != nullptr); 73 // Stream messages are no longer valid after receiving trailing metadata. 74 if ((*calld->recv_message_) != nullptr) { 75 ++calld->recv_message_count_; 76 } 77 GRPC_CLOSURE_RUN(calld->initial_on_done_recv_message_, GRPC_ERROR_REF(error)); 78 } 79 80 void CensusServerCallData::OnDoneRecvInitialMetadataCb(void* user_data, 81 grpc_error* error) { 82 grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data); 83 CensusServerCallData* calld = 84 reinterpret_cast<CensusServerCallData*>(elem->call_data); 85 GPR_ASSERT(calld != nullptr); 86 if (error == GRPC_ERROR_NONE) { 87 grpc_metadata_batch* initial_metadata = calld->recv_initial_metadata_; 88 GPR_ASSERT(initial_metadata != nullptr); 89 ServerMetadataElements sml; 90 sml.path = grpc_empty_slice(); 91 sml.tracing_slice = grpc_empty_slice(); 92 sml.census_proto = grpc_empty_slice(); 93 FilterInitialMetadata(initial_metadata, &sml); 94 calld->path_ = grpc_slice_ref_internal(sml.path); 95 calld->method_ = GetMethod(&calld->path_); 96 calld->qualified_method_ = absl::StrCat("Recv.", calld->method_); 97 const char* tracing_str = 98 GRPC_SLICE_IS_EMPTY(sml.tracing_slice) 99 ? "" 100 : reinterpret_cast<const char*>( 101 GRPC_SLICE_START_PTR(sml.tracing_slice)); 102 size_t tracing_str_len = GRPC_SLICE_IS_EMPTY(sml.tracing_slice) 103 ? 0 104 : GRPC_SLICE_LENGTH(sml.tracing_slice); 105 const char* census_str = GRPC_SLICE_IS_EMPTY(sml.census_proto) 106 ? "" 107 : reinterpret_cast<const char*>( 108 GRPC_SLICE_START_PTR(sml.census_proto)); 109 size_t census_str_len = GRPC_SLICE_IS_EMPTY(sml.census_proto) 110 ? 0 111 : GRPC_SLICE_LENGTH(sml.census_proto); 112 113 GenerateServerContext(absl::string_view(tracing_str, tracing_str_len), 114 absl::string_view(census_str, census_str_len), 115 /*primary_role*/ "", calld->qualified_method_, 116 &calld->context_); 117 118 grpc_slice_unref_internal(sml.tracing_slice); 119 grpc_slice_unref_internal(sml.census_proto); 120 grpc_slice_unref_internal(sml.path); 121 grpc_census_call_set_context( 122 calld->gc_, reinterpret_cast<census_context*>(&calld->context_)); 123 } 124 GRPC_CLOSURE_RUN(calld->initial_on_done_recv_initial_metadata_, 125 GRPC_ERROR_REF(error)); 126 } 127 128 void CensusServerCallData::StartTransportStreamOpBatch( 129 grpc_call_element* elem, TransportStreamOpBatch* op) { 130 if (op->recv_initial_metadata() != nullptr) { 131 // substitute our callback for the op callback 132 recv_initial_metadata_ = op->recv_initial_metadata()->batch(); 133 initial_on_done_recv_initial_metadata_ = op->recv_initial_metadata_ready(); 134 op->set_recv_initial_metadata_ready(&on_done_recv_initial_metadata_); 135 } 136 if (op->send_message() != nullptr) { 137 ++sent_message_count_; 138 } 139 if (op->recv_message() != nullptr) { 140 recv_message_ = op->op()->payload->recv_message.recv_message; 141 initial_on_done_recv_message_ = 142 op->op()->payload->recv_message.recv_message_ready; 143 op->op()->payload->recv_message.recv_message_ready = &on_done_recv_message_; 144 } 145 // We need to record the time when the trailing metadata was sent to mark the 146 // completeness of the request. 147 if (op->send_trailing_metadata() != nullptr) { 148 elapsed_time_ = absl::Now() - start_time_; 149 size_t len = ServerStatsSerialize(absl::ToInt64Nanoseconds(elapsed_time_), 150 stats_buf_, kMaxServerStatsLen); 151 if (len > 0) { 152 GRPC_LOG_IF_ERROR( 153 "census grpc_filter", 154 grpc_metadata_batch_add_tail( 155 op->send_trailing_metadata()->batch(), &census_bin_, 156 grpc_mdelem_from_slices( 157 GRPC_MDSTR_GRPC_SERVER_STATS_BIN, 158 grpc_slice_from_copied_buffer(stats_buf_, len)))); 159 } 160 } 161 // Call next op. 162 grpc_call_next_op(elem, op->op()); 163 } 164 165 grpc_error* CensusServerCallData::Init(grpc_call_element* elem, 166 const grpc_call_element_args* args) { 167 start_time_ = absl::Now(); 168 gc_ = 169 grpc_call_from_top_element(grpc_call_stack_element(args->call_stack, 0)); 170 GRPC_CLOSURE_INIT(&on_done_recv_initial_metadata_, 171 OnDoneRecvInitialMetadataCb, elem, 172 grpc_schedule_on_exec_ctx); 173 GRPC_CLOSURE_INIT(&on_done_recv_message_, OnDoneRecvMessageCb, elem, 174 grpc_schedule_on_exec_ctx); 175 auth_context_ = grpc_call_auth_context(gc_); 176 return GRPC_ERROR_NONE; 177 } 178 179 void CensusServerCallData::Destroy(grpc_call_element* elem, 180 const grpc_call_final_info* final_info, 181 grpc_closure* then_call_closure) { 182 const uint64_t request_size = GetOutgoingDataSize(final_info); 183 const uint64_t response_size = GetIncomingDataSize(final_info); 184 double elapsed_time_ms = absl::ToDoubleMilliseconds(elapsed_time_); 185 grpc_auth_context_release(auth_context_); 186 ::opencensus::stats::Record( 187 {{RpcServerSentBytesPerRpc(), static_cast<double>(response_size)}, 188 {RpcServerReceivedBytesPerRpc(), static_cast<double>(request_size)}, 189 {RpcServerServerLatency(), elapsed_time_ms}, 190 {RpcServerSentMessagesPerRpc(), sent_message_count_}, 191 {RpcServerReceivedMessagesPerRpc(), recv_message_count_}}, 192 {{ServerMethodTagKey(), method_}, 193 {ServerStatusTagKey(), StatusCodeToString(final_info->final_status)}}); 194 grpc_slice_unref_internal(path_); 195 context_.EndSpan(); 196 } 197 198 } // namespace grpc 199