1 /* 2 * 3 * Copyright 2016 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include <grpc/support/port_platform.h> 20 21 #include "pb_decode.h" 22 #include "pb_encode.h" 23 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h" 24 25 #include <grpc/support/alloc.h> 26 27 /* invoked once for every Server in ServerList */ 28 static bool count_serverlist(pb_istream_t* stream, const pb_field_t* field, 29 void** arg) { 30 grpc_grpclb_serverlist* sl = static_cast<grpc_grpclb_serverlist*>(*arg); 31 grpc_grpclb_server server; 32 if (GPR_UNLIKELY(!pb_decode(stream, grpc_lb_v1_Server_fields, &server))) { 33 gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream)); 34 return false; 35 } 36 ++sl->num_servers; 37 return true; 38 } 39 40 typedef struct decode_serverlist_arg { 41 /* The decoding callback is invoked once per server in serverlist. Remember 42 * which index of the serverlist are we currently decoding */ 43 size_t decoding_idx; 44 /* The decoded serverlist */ 45 grpc_grpclb_serverlist* serverlist; 46 } decode_serverlist_arg; 47 48 /* invoked once for every Server in ServerList */ 49 static bool decode_serverlist(pb_istream_t* stream, const pb_field_t* field, 50 void** arg) { 51 decode_serverlist_arg* dec_arg = static_cast<decode_serverlist_arg*>(*arg); 52 GPR_ASSERT(dec_arg->serverlist->num_servers >= dec_arg->decoding_idx); 53 grpc_grpclb_server* server = 54 static_cast<grpc_grpclb_server*>(gpr_zalloc(sizeof(grpc_grpclb_server))); 55 if (GPR_UNLIKELY(!pb_decode(stream, grpc_lb_v1_Server_fields, server))) { 56 gpr_free(server); 57 gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream)); 58 return false; 59 } 60 dec_arg->serverlist->servers[dec_arg->decoding_idx++] = server; 61 return true; 62 } 63 64 grpc_grpclb_request* grpc_grpclb_request_create(const char* lb_service_name) { 65 grpc_grpclb_request* req = static_cast<grpc_grpclb_request*>( 66 gpr_malloc(sizeof(grpc_grpclb_request))); 67 req->has_client_stats = false; 68 req->has_initial_request = true; 69 req->initial_request.has_name = true; 70 strncpy(req->initial_request.name, lb_service_name, 71 GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH); 72 return req; 73 } 74 75 static void populate_timestamp(gpr_timespec timestamp, 76 grpc_grpclb_timestamp* timestamp_pb) { 77 timestamp_pb->has_seconds = true; 78 timestamp_pb->seconds = timestamp.tv_sec; 79 timestamp_pb->has_nanos = true; 80 timestamp_pb->nanos = timestamp.tv_nsec; 81 } 82 83 static bool encode_string(pb_ostream_t* stream, const pb_field_t* field, 84 void* const* arg) { 85 char* str = static_cast<char*>(*arg); 86 if (!pb_encode_tag_for_field(stream, field)) return false; 87 return pb_encode_string(stream, reinterpret_cast<uint8_t*>(str), strlen(str)); 88 } 89 90 static bool encode_drops(pb_ostream_t* stream, const pb_field_t* field, 91 void* const* arg) { 92 grpc_core::GrpcLbClientStats::DroppedCallCounts* drop_entries = 93 static_cast<grpc_core::GrpcLbClientStats::DroppedCallCounts*>(*arg); 94 if (drop_entries == nullptr) return true; 95 for (size_t i = 0; i < drop_entries->size(); ++i) { 96 if (!pb_encode_tag_for_field(stream, field)) return false; 97 grpc_lb_v1_ClientStatsPerToken drop_message; 98 drop_message.load_balance_token.funcs.encode = encode_string; 99 drop_message.load_balance_token.arg = (*drop_entries)[i].token.get(); 100 drop_message.has_num_calls = true; 101 drop_message.num_calls = (*drop_entries)[i].count; 102 if (!pb_encode_submessage(stream, grpc_lb_v1_ClientStatsPerToken_fields, 103 &drop_message)) { 104 return false; 105 } 106 } 107 return true; 108 } 109 110 grpc_grpclb_request* grpc_grpclb_load_report_request_create_locked( 111 grpc_core::GrpcLbClientStats* client_stats) { 112 grpc_grpclb_request* req = static_cast<grpc_grpclb_request*>( 113 gpr_zalloc(sizeof(grpc_grpclb_request))); 114 req->has_client_stats = true; 115 req->client_stats.has_timestamp = true; 116 populate_timestamp(gpr_now(GPR_CLOCK_REALTIME), &req->client_stats.timestamp); 117 req->client_stats.has_num_calls_started = true; 118 req->client_stats.has_num_calls_finished = true; 119 req->client_stats.has_num_calls_finished_with_client_failed_to_send = true; 120 req->client_stats.has_num_calls_finished_with_client_failed_to_send = true; 121 req->client_stats.has_num_calls_finished_known_received = true; 122 req->client_stats.calls_finished_with_drop.funcs.encode = encode_drops; 123 grpc_core::UniquePtr<grpc_core::GrpcLbClientStats::DroppedCallCounts> 124 drop_counts; 125 client_stats->GetLocked( 126 &req->client_stats.num_calls_started, 127 &req->client_stats.num_calls_finished, 128 &req->client_stats.num_calls_finished_with_client_failed_to_send, 129 &req->client_stats.num_calls_finished_known_received, &drop_counts); 130 // Will be deleted in grpc_grpclb_request_destroy(). 131 req->client_stats.calls_finished_with_drop.arg = drop_counts.release(); 132 return req; 133 } 134 135 grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request* request) { 136 size_t encoded_length; 137 pb_ostream_t sizestream; 138 pb_ostream_t outputstream; 139 grpc_slice slice; 140 memset(&sizestream, 0, sizeof(pb_ostream_t)); 141 pb_encode(&sizestream, grpc_lb_v1_LoadBalanceRequest_fields, request); 142 encoded_length = sizestream.bytes_written; 143 144 slice = GRPC_SLICE_MALLOC(encoded_length); 145 outputstream = 146 pb_ostream_from_buffer(GRPC_SLICE_START_PTR(slice), encoded_length); 147 GPR_ASSERT(pb_encode(&outputstream, grpc_lb_v1_LoadBalanceRequest_fields, 148 request) != 0); 149 return slice; 150 } 151 152 void grpc_grpclb_request_destroy(grpc_grpclb_request* request) { 153 if (request->has_client_stats) { 154 grpc_core::GrpcLbClientStats::DroppedCallCounts* drop_entries = 155 static_cast<grpc_core::GrpcLbClientStats::DroppedCallCounts*>( 156 request->client_stats.calls_finished_with_drop.arg); 157 grpc_core::Delete(drop_entries); 158 } 159 gpr_free(request); 160 } 161 162 typedef grpc_lb_v1_LoadBalanceResponse grpc_grpclb_response; 163 grpc_grpclb_initial_response* grpc_grpclb_initial_response_parse( 164 grpc_slice encoded_grpc_grpclb_response) { 165 pb_istream_t stream = 166 pb_istream_from_buffer(GRPC_SLICE_START_PTR(encoded_grpc_grpclb_response), 167 GRPC_SLICE_LENGTH(encoded_grpc_grpclb_response)); 168 grpc_grpclb_response res; 169 memset(&res, 0, sizeof(grpc_grpclb_response)); 170 if (GPR_UNLIKELY( 171 !pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res))) { 172 gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); 173 return nullptr; 174 } 175 176 if (!res.has_initial_response) return nullptr; 177 178 grpc_grpclb_initial_response* initial_res = 179 static_cast<grpc_grpclb_initial_response*>( 180 gpr_malloc(sizeof(grpc_grpclb_initial_response))); 181 memcpy(initial_res, &res.initial_response, 182 sizeof(grpc_grpclb_initial_response)); 183 184 return initial_res; 185 } 186 187 grpc_grpclb_serverlist* grpc_grpclb_response_parse_serverlist( 188 grpc_slice encoded_grpc_grpclb_response) { 189 pb_istream_t stream = 190 pb_istream_from_buffer(GRPC_SLICE_START_PTR(encoded_grpc_grpclb_response), 191 GRPC_SLICE_LENGTH(encoded_grpc_grpclb_response)); 192 pb_istream_t stream_at_start = stream; 193 grpc_grpclb_serverlist* sl = static_cast<grpc_grpclb_serverlist*>( 194 gpr_zalloc(sizeof(grpc_grpclb_serverlist))); 195 grpc_grpclb_response res; 196 memset(&res, 0, sizeof(grpc_grpclb_response)); 197 // First pass: count number of servers. 198 res.server_list.servers.funcs.decode = count_serverlist; 199 res.server_list.servers.arg = sl; 200 bool status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res); 201 if (GPR_UNLIKELY(!status)) { 202 gpr_free(sl); 203 gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); 204 return nullptr; 205 } 206 // Second pass: populate servers. 207 if (sl->num_servers > 0) { 208 sl->servers = static_cast<grpc_grpclb_server**>( 209 gpr_zalloc(sizeof(grpc_grpclb_server*) * sl->num_servers)); 210 decode_serverlist_arg decode_arg; 211 memset(&decode_arg, 0, sizeof(decode_arg)); 212 decode_arg.serverlist = sl; 213 res.server_list.servers.funcs.decode = decode_serverlist; 214 res.server_list.servers.arg = &decode_arg; 215 status = pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields, 216 &res); 217 if (GPR_UNLIKELY(!status)) { 218 grpc_grpclb_destroy_serverlist(sl); 219 gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); 220 return nullptr; 221 } 222 } 223 return sl; 224 } 225 226 void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist* serverlist) { 227 if (serverlist == nullptr) { 228 return; 229 } 230 for (size_t i = 0; i < serverlist->num_servers; i++) { 231 gpr_free(serverlist->servers[i]); 232 } 233 gpr_free(serverlist->servers); 234 gpr_free(serverlist); 235 } 236 237 grpc_grpclb_serverlist* grpc_grpclb_serverlist_copy( 238 const grpc_grpclb_serverlist* sl) { 239 grpc_grpclb_serverlist* copy = static_cast<grpc_grpclb_serverlist*>( 240 gpr_zalloc(sizeof(grpc_grpclb_serverlist))); 241 copy->num_servers = sl->num_servers; 242 copy->servers = static_cast<grpc_grpclb_server**>( 243 gpr_malloc(sizeof(grpc_grpclb_server*) * sl->num_servers)); 244 for (size_t i = 0; i < sl->num_servers; i++) { 245 copy->servers[i] = static_cast<grpc_grpclb_server*>( 246 gpr_malloc(sizeof(grpc_grpclb_server))); 247 memcpy(copy->servers[i], sl->servers[i], sizeof(grpc_grpclb_server)); 248 } 249 return copy; 250 } 251 252 bool grpc_grpclb_serverlist_equals(const grpc_grpclb_serverlist* lhs, 253 const grpc_grpclb_serverlist* rhs) { 254 if (lhs == nullptr || rhs == nullptr) { 255 return false; 256 } 257 if (lhs->num_servers != rhs->num_servers) { 258 return false; 259 } 260 for (size_t i = 0; i < lhs->num_servers; i++) { 261 if (!grpc_grpclb_server_equals(lhs->servers[i], rhs->servers[i])) { 262 return false; 263 } 264 } 265 return true; 266 } 267 268 bool grpc_grpclb_server_equals(const grpc_grpclb_server* lhs, 269 const grpc_grpclb_server* rhs) { 270 return memcmp(lhs, rhs, sizeof(grpc_grpclb_server)) == 0; 271 } 272 273 int grpc_grpclb_duration_compare(const grpc_grpclb_duration* lhs, 274 const grpc_grpclb_duration* rhs) { 275 GPR_ASSERT(lhs && rhs); 276 if (lhs->has_seconds && rhs->has_seconds) { 277 if (lhs->seconds < rhs->seconds) return -1; 278 if (lhs->seconds > rhs->seconds) return 1; 279 } else if (lhs->has_seconds) { 280 return 1; 281 } else if (rhs->has_seconds) { 282 return -1; 283 } 284 285 GPR_ASSERT(lhs->seconds == rhs->seconds); 286 if (lhs->has_nanos && rhs->has_nanos) { 287 if (lhs->nanos < rhs->nanos) return -1; 288 if (lhs->nanos > rhs->nanos) return 1; 289 } else if (lhs->has_nanos) { 290 return 1; 291 } else if (rhs->has_nanos) { 292 return -1; 293 } 294 295 return 0; 296 } 297 298 grpc_millis grpc_grpclb_duration_to_millis(grpc_grpclb_duration* duration_pb) { 299 return static_cast<grpc_millis>( 300 (duration_pb->has_seconds ? duration_pb->seconds : 0) * GPR_MS_PER_SEC + 301 (duration_pb->has_nanos ? duration_pb->nanos : 0) / GPR_NS_PER_MS); 302 } 303 304 void grpc_grpclb_initial_response_destroy( 305 grpc_grpclb_initial_response* response) { 306 gpr_free(response); 307 } 308