Home | History | Annotate | Download | only in grpclb
      1 /*
      2  *
      3  * Copyright 2016 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include <grpc/support/port_platform.h>
     20 
     21 #include "pb_decode.h"
     22 #include "pb_encode.h"
     23 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
     24 
     25 #include <grpc/support/alloc.h>
     26 
     27 /* invoked once for every Server in ServerList */
     28 static bool count_serverlist(pb_istream_t* stream, const pb_field_t* field,
     29                              void** arg) {
     30   grpc_grpclb_serverlist* sl = static_cast<grpc_grpclb_serverlist*>(*arg);
     31   grpc_grpclb_server server;
     32   if (GPR_UNLIKELY(!pb_decode(stream, grpc_lb_v1_Server_fields, &server))) {
     33     gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
     34     return false;
     35   }
     36   ++sl->num_servers;
     37   return true;
     38 }
     39 
     40 typedef struct decode_serverlist_arg {
     41   /* The decoding callback is invoked once per server in serverlist. Remember
     42    * which index of the serverlist are we currently decoding */
     43   size_t decoding_idx;
     44   /* The decoded serverlist */
     45   grpc_grpclb_serverlist* serverlist;
     46 } decode_serverlist_arg;
     47 
     48 /* invoked once for every Server in ServerList */
     49 static bool decode_serverlist(pb_istream_t* stream, const pb_field_t* field,
     50                               void** arg) {
     51   decode_serverlist_arg* dec_arg = static_cast<decode_serverlist_arg*>(*arg);
     52   GPR_ASSERT(dec_arg->serverlist->num_servers >= dec_arg->decoding_idx);
     53   grpc_grpclb_server* server =
     54       static_cast<grpc_grpclb_server*>(gpr_zalloc(sizeof(grpc_grpclb_server)));
     55   if (GPR_UNLIKELY(!pb_decode(stream, grpc_lb_v1_Server_fields, server))) {
     56     gpr_free(server);
     57     gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
     58     return false;
     59   }
     60   dec_arg->serverlist->servers[dec_arg->decoding_idx++] = server;
     61   return true;
     62 }
     63 
     64 grpc_grpclb_request* grpc_grpclb_request_create(const char* lb_service_name) {
     65   grpc_grpclb_request* req = static_cast<grpc_grpclb_request*>(
     66       gpr_malloc(sizeof(grpc_grpclb_request)));
     67   req->has_client_stats = false;
     68   req->has_initial_request = true;
     69   req->initial_request.has_name = true;
     70   strncpy(req->initial_request.name, lb_service_name,
     71           GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH);
     72   return req;
     73 }
     74 
     75 static void populate_timestamp(gpr_timespec timestamp,
     76                                grpc_grpclb_timestamp* timestamp_pb) {
     77   timestamp_pb->has_seconds = true;
     78   timestamp_pb->seconds = timestamp.tv_sec;
     79   timestamp_pb->has_nanos = true;
     80   timestamp_pb->nanos = timestamp.tv_nsec;
     81 }
     82 
     83 static bool encode_string(pb_ostream_t* stream, const pb_field_t* field,
     84                           void* const* arg) {
     85   char* str = static_cast<char*>(*arg);
     86   if (!pb_encode_tag_for_field(stream, field)) return false;
     87   return pb_encode_string(stream, reinterpret_cast<uint8_t*>(str), strlen(str));
     88 }
     89 
     90 static bool encode_drops(pb_ostream_t* stream, const pb_field_t* field,
     91                          void* const* arg) {
     92   grpc_core::GrpcLbClientStats::DroppedCallCounts* drop_entries =
     93       static_cast<grpc_core::GrpcLbClientStats::DroppedCallCounts*>(*arg);
     94   if (drop_entries == nullptr) return true;
     95   for (size_t i = 0; i < drop_entries->size(); ++i) {
     96     if (!pb_encode_tag_for_field(stream, field)) return false;
     97     grpc_lb_v1_ClientStatsPerToken drop_message;
     98     drop_message.load_balance_token.funcs.encode = encode_string;
     99     drop_message.load_balance_token.arg = (*drop_entries)[i].token.get();
    100     drop_message.has_num_calls = true;
    101     drop_message.num_calls = (*drop_entries)[i].count;
    102     if (!pb_encode_submessage(stream, grpc_lb_v1_ClientStatsPerToken_fields,
    103                               &drop_message)) {
    104       return false;
    105     }
    106   }
    107   return true;
    108 }
    109 
    110 grpc_grpclb_request* grpc_grpclb_load_report_request_create_locked(
    111     grpc_core::GrpcLbClientStats* client_stats) {
    112   grpc_grpclb_request* req = static_cast<grpc_grpclb_request*>(
    113       gpr_zalloc(sizeof(grpc_grpclb_request)));
    114   req->has_client_stats = true;
    115   req->client_stats.has_timestamp = true;
    116   populate_timestamp(gpr_now(GPR_CLOCK_REALTIME), &req->client_stats.timestamp);
    117   req->client_stats.has_num_calls_started = true;
    118   req->client_stats.has_num_calls_finished = true;
    119   req->client_stats.has_num_calls_finished_with_client_failed_to_send = true;
    120   req->client_stats.has_num_calls_finished_with_client_failed_to_send = true;
    121   req->client_stats.has_num_calls_finished_known_received = true;
    122   req->client_stats.calls_finished_with_drop.funcs.encode = encode_drops;
    123   grpc_core::UniquePtr<grpc_core::GrpcLbClientStats::DroppedCallCounts>
    124       drop_counts;
    125   client_stats->GetLocked(
    126       &req->client_stats.num_calls_started,
    127       &req->client_stats.num_calls_finished,
    128       &req->client_stats.num_calls_finished_with_client_failed_to_send,
    129       &req->client_stats.num_calls_finished_known_received, &drop_counts);
    130   // Will be deleted in grpc_grpclb_request_destroy().
    131   req->client_stats.calls_finished_with_drop.arg = drop_counts.release();
    132   return req;
    133 }
    134 
    135 grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request* request) {
    136   size_t encoded_length;
    137   pb_ostream_t sizestream;
    138   pb_ostream_t outputstream;
    139   grpc_slice slice;
    140   memset(&sizestream, 0, sizeof(pb_ostream_t));
    141   pb_encode(&sizestream, grpc_lb_v1_LoadBalanceRequest_fields, request);
    142   encoded_length = sizestream.bytes_written;
    143 
    144   slice = GRPC_SLICE_MALLOC(encoded_length);
    145   outputstream =
    146       pb_ostream_from_buffer(GRPC_SLICE_START_PTR(slice), encoded_length);
    147   GPR_ASSERT(pb_encode(&outputstream, grpc_lb_v1_LoadBalanceRequest_fields,
    148                        request) != 0);
    149   return slice;
    150 }
    151 
    152 void grpc_grpclb_request_destroy(grpc_grpclb_request* request) {
    153   if (request->has_client_stats) {
    154     grpc_core::GrpcLbClientStats::DroppedCallCounts* drop_entries =
    155         static_cast<grpc_core::GrpcLbClientStats::DroppedCallCounts*>(
    156             request->client_stats.calls_finished_with_drop.arg);
    157     grpc_core::Delete(drop_entries);
    158   }
    159   gpr_free(request);
    160 }
    161 
    162 typedef grpc_lb_v1_LoadBalanceResponse grpc_grpclb_response;
    163 grpc_grpclb_initial_response* grpc_grpclb_initial_response_parse(
    164     grpc_slice encoded_grpc_grpclb_response) {
    165   pb_istream_t stream =
    166       pb_istream_from_buffer(GRPC_SLICE_START_PTR(encoded_grpc_grpclb_response),
    167                              GRPC_SLICE_LENGTH(encoded_grpc_grpclb_response));
    168   grpc_grpclb_response res;
    169   memset(&res, 0, sizeof(grpc_grpclb_response));
    170   if (GPR_UNLIKELY(
    171           !pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res))) {
    172     gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
    173     return nullptr;
    174   }
    175 
    176   if (!res.has_initial_response) return nullptr;
    177 
    178   grpc_grpclb_initial_response* initial_res =
    179       static_cast<grpc_grpclb_initial_response*>(
    180           gpr_malloc(sizeof(grpc_grpclb_initial_response)));
    181   memcpy(initial_res, &res.initial_response,
    182          sizeof(grpc_grpclb_initial_response));
    183 
    184   return initial_res;
    185 }
    186 
    187 grpc_grpclb_serverlist* grpc_grpclb_response_parse_serverlist(
    188     grpc_slice encoded_grpc_grpclb_response) {
    189   pb_istream_t stream =
    190       pb_istream_from_buffer(GRPC_SLICE_START_PTR(encoded_grpc_grpclb_response),
    191                              GRPC_SLICE_LENGTH(encoded_grpc_grpclb_response));
    192   pb_istream_t stream_at_start = stream;
    193   grpc_grpclb_serverlist* sl = static_cast<grpc_grpclb_serverlist*>(
    194       gpr_zalloc(sizeof(grpc_grpclb_serverlist)));
    195   grpc_grpclb_response res;
    196   memset(&res, 0, sizeof(grpc_grpclb_response));
    197   // First pass: count number of servers.
    198   res.server_list.servers.funcs.decode = count_serverlist;
    199   res.server_list.servers.arg = sl;
    200   bool status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res);
    201   if (GPR_UNLIKELY(!status)) {
    202     gpr_free(sl);
    203     gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
    204     return nullptr;
    205   }
    206   // Second pass: populate servers.
    207   if (sl->num_servers > 0) {
    208     sl->servers = static_cast<grpc_grpclb_server**>(
    209         gpr_zalloc(sizeof(grpc_grpclb_server*) * sl->num_servers));
    210     decode_serverlist_arg decode_arg;
    211     memset(&decode_arg, 0, sizeof(decode_arg));
    212     decode_arg.serverlist = sl;
    213     res.server_list.servers.funcs.decode = decode_serverlist;
    214     res.server_list.servers.arg = &decode_arg;
    215     status = pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields,
    216                        &res);
    217     if (GPR_UNLIKELY(!status)) {
    218       grpc_grpclb_destroy_serverlist(sl);
    219       gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
    220       return nullptr;
    221     }
    222   }
    223   return sl;
    224 }
    225 
    226 void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist* serverlist) {
    227   if (serverlist == nullptr) {
    228     return;
    229   }
    230   for (size_t i = 0; i < serverlist->num_servers; i++) {
    231     gpr_free(serverlist->servers[i]);
    232   }
    233   gpr_free(serverlist->servers);
    234   gpr_free(serverlist);
    235 }
    236 
    237 grpc_grpclb_serverlist* grpc_grpclb_serverlist_copy(
    238     const grpc_grpclb_serverlist* sl) {
    239   grpc_grpclb_serverlist* copy = static_cast<grpc_grpclb_serverlist*>(
    240       gpr_zalloc(sizeof(grpc_grpclb_serverlist)));
    241   copy->num_servers = sl->num_servers;
    242   copy->servers = static_cast<grpc_grpclb_server**>(
    243       gpr_malloc(sizeof(grpc_grpclb_server*) * sl->num_servers));
    244   for (size_t i = 0; i < sl->num_servers; i++) {
    245     copy->servers[i] = static_cast<grpc_grpclb_server*>(
    246         gpr_malloc(sizeof(grpc_grpclb_server)));
    247     memcpy(copy->servers[i], sl->servers[i], sizeof(grpc_grpclb_server));
    248   }
    249   return copy;
    250 }
    251 
    252 bool grpc_grpclb_serverlist_equals(const grpc_grpclb_serverlist* lhs,
    253                                    const grpc_grpclb_serverlist* rhs) {
    254   if (lhs == nullptr || rhs == nullptr) {
    255     return false;
    256   }
    257   if (lhs->num_servers != rhs->num_servers) {
    258     return false;
    259   }
    260   for (size_t i = 0; i < lhs->num_servers; i++) {
    261     if (!grpc_grpclb_server_equals(lhs->servers[i], rhs->servers[i])) {
    262       return false;
    263     }
    264   }
    265   return true;
    266 }
    267 
    268 bool grpc_grpclb_server_equals(const grpc_grpclb_server* lhs,
    269                                const grpc_grpclb_server* rhs) {
    270   return memcmp(lhs, rhs, sizeof(grpc_grpclb_server)) == 0;
    271 }
    272 
    273 int grpc_grpclb_duration_compare(const grpc_grpclb_duration* lhs,
    274                                  const grpc_grpclb_duration* rhs) {
    275   GPR_ASSERT(lhs && rhs);
    276   if (lhs->has_seconds && rhs->has_seconds) {
    277     if (lhs->seconds < rhs->seconds) return -1;
    278     if (lhs->seconds > rhs->seconds) return 1;
    279   } else if (lhs->has_seconds) {
    280     return 1;
    281   } else if (rhs->has_seconds) {
    282     return -1;
    283   }
    284 
    285   GPR_ASSERT(lhs->seconds == rhs->seconds);
    286   if (lhs->has_nanos && rhs->has_nanos) {
    287     if (lhs->nanos < rhs->nanos) return -1;
    288     if (lhs->nanos > rhs->nanos) return 1;
    289   } else if (lhs->has_nanos) {
    290     return 1;
    291   } else if (rhs->has_nanos) {
    292     return -1;
    293   }
    294 
    295   return 0;
    296 }
    297 
    298 grpc_millis grpc_grpclb_duration_to_millis(grpc_grpclb_duration* duration_pb) {
    299   return static_cast<grpc_millis>(
    300       (duration_pb->has_seconds ? duration_pb->seconds : 0) * GPR_MS_PER_SEC +
    301       (duration_pb->has_nanos ? duration_pb->nanos : 0) / GPR_NS_PER_MS);
    302 }
    303 
    304 void grpc_grpclb_initial_response_destroy(
    305     grpc_grpclb_initial_response* response) {
    306   gpr_free(response);
    307 }
    308