1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include <grpc/support/port_platform.h> 20 21 #include "src/core/lib/http/httpcli.h" 22 23 #include <string.h> 24 25 #include <grpc/support/alloc.h> 26 #include <grpc/support/log.h> 27 #include <grpc/support/string_util.h> 28 29 #include "src/core/lib/channel/channel_args.h" 30 #include "src/core/lib/gpr/string.h" 31 #include "src/core/lib/http/format_request.h" 32 #include "src/core/lib/http/parser.h" 33 #include "src/core/lib/iomgr/endpoint.h" 34 #include "src/core/lib/iomgr/iomgr_internal.h" 35 #include "src/core/lib/iomgr/resolve_address.h" 36 #include "src/core/lib/iomgr/sockaddr_utils.h" 37 #include "src/core/lib/iomgr/tcp_client.h" 38 #include "src/core/lib/slice/slice_internal.h" 39 40 typedef struct { 41 grpc_slice request_text; 42 grpc_http_parser parser; 43 grpc_resolved_addresses* addresses; 44 size_t next_address; 45 grpc_endpoint* ep; 46 char* host; 47 char* ssl_host_override; 48 grpc_millis deadline; 49 int have_read_byte; 50 const grpc_httpcli_handshaker* handshaker; 51 grpc_closure* on_done; 52 grpc_httpcli_context* context; 53 grpc_polling_entity* pollent; 54 grpc_iomgr_object iomgr_obj; 55 grpc_slice_buffer incoming; 56 grpc_slice_buffer outgoing; 57 grpc_closure on_read; 58 grpc_closure done_write; 59 grpc_closure connected; 60 grpc_error* overall_error; 61 grpc_resource_quota* resource_quota; 62 } internal_request; 63 64 static grpc_httpcli_get_override g_get_override = nullptr; 65 static grpc_httpcli_post_override g_post_override = nullptr; 66 67 static void plaintext_handshake(void* arg, grpc_endpoint* endpoint, 68 const char* host, grpc_millis deadline, 69 void (*on_done)(void* arg, 70 grpc_endpoint* endpoint)) { 71 on_done(arg, endpoint); 72 } 73 74 const grpc_httpcli_handshaker grpc_httpcli_plaintext = {"http", 75 plaintext_handshake}; 76 77 void grpc_httpcli_context_init(grpc_httpcli_context* context) { 78 context->pollset_set = grpc_pollset_set_create(); 79 } 80 81 void grpc_httpcli_context_destroy(grpc_httpcli_context* context) { 82 grpc_pollset_set_destroy(context->pollset_set); 83 } 84 85 static void next_address(internal_request* req, grpc_error* due_to_error); 86 87 static void finish(internal_request* req, grpc_error* error) { 88 grpc_polling_entity_del_from_pollset_set(req->pollent, 89 req->context->pollset_set); 90 GRPC_CLOSURE_SCHED(req->on_done, error); 91 grpc_http_parser_destroy(&req->parser); 92 if (req->addresses != nullptr) { 93 grpc_resolved_addresses_destroy(req->addresses); 94 } 95 if (req->ep != nullptr) { 96 grpc_endpoint_destroy(req->ep); 97 } 98 grpc_slice_unref_internal(req->request_text); 99 gpr_free(req->host); 100 gpr_free(req->ssl_host_override); 101 grpc_iomgr_unregister_object(&req->iomgr_obj); 102 grpc_slice_buffer_destroy_internal(&req->incoming); 103 grpc_slice_buffer_destroy_internal(&req->outgoing); 104 GRPC_ERROR_UNREF(req->overall_error); 105 grpc_resource_quota_unref_internal(req->resource_quota); 106 gpr_free(req); 107 } 108 109 static void append_error(internal_request* req, grpc_error* error) { 110 if (req->overall_error == GRPC_ERROR_NONE) { 111 req->overall_error = 112 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Failed HTTP/1 client request"); 113 } 114 grpc_resolved_address* addr = &req->addresses->addrs[req->next_address - 1]; 115 char* addr_text = grpc_sockaddr_to_uri(addr); 116 req->overall_error = grpc_error_add_child( 117 req->overall_error, 118 grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, 119 grpc_slice_from_copied_string(addr_text))); 120 gpr_free(addr_text); 121 } 122 123 static void do_read(internal_request* req) { 124 grpc_endpoint_read(req->ep, &req->incoming, &req->on_read); 125 } 126 127 static void on_read(void* user_data, grpc_error* error) { 128 internal_request* req = static_cast<internal_request*>(user_data); 129 size_t i; 130 131 for (i = 0; i < req->incoming.count; i++) { 132 if (GRPC_SLICE_LENGTH(req->incoming.slices[i])) { 133 req->have_read_byte = 1; 134 grpc_error* err = grpc_http_parser_parse( 135 &req->parser, req->incoming.slices[i], nullptr); 136 if (err != GRPC_ERROR_NONE) { 137 finish(req, err); 138 return; 139 } 140 } 141 } 142 143 if (error == GRPC_ERROR_NONE) { 144 do_read(req); 145 } else if (!req->have_read_byte) { 146 next_address(req, GRPC_ERROR_REF(error)); 147 } else { 148 finish(req, grpc_http_parser_eof(&req->parser)); 149 } 150 } 151 152 static void on_written(internal_request* req) { do_read(req); } 153 154 static void done_write(void* arg, grpc_error* error) { 155 internal_request* req = static_cast<internal_request*>(arg); 156 if (error == GRPC_ERROR_NONE) { 157 on_written(req); 158 } else { 159 next_address(req, GRPC_ERROR_REF(error)); 160 } 161 } 162 163 static void start_write(internal_request* req) { 164 grpc_slice_ref_internal(req->request_text); 165 grpc_slice_buffer_add(&req->outgoing, req->request_text); 166 grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write, nullptr); 167 } 168 169 static void on_handshake_done(void* arg, grpc_endpoint* ep) { 170 internal_request* req = static_cast<internal_request*>(arg); 171 172 if (!ep) { 173 next_address(req, GRPC_ERROR_CREATE_FROM_STATIC_STRING( 174 "Unexplained handshake failure")); 175 return; 176 } 177 178 req->ep = ep; 179 start_write(req); 180 } 181 182 static void on_connected(void* arg, grpc_error* error) { 183 internal_request* req = static_cast<internal_request*>(arg); 184 185 if (!req->ep) { 186 next_address(req, GRPC_ERROR_REF(error)); 187 return; 188 } 189 req->handshaker->handshake( 190 req, req->ep, req->ssl_host_override ? req->ssl_host_override : req->host, 191 req->deadline, on_handshake_done); 192 } 193 194 static void next_address(internal_request* req, grpc_error* error) { 195 grpc_resolved_address* addr; 196 if (error != GRPC_ERROR_NONE) { 197 append_error(req, error); 198 } 199 if (req->next_address == req->addresses->naddrs) { 200 finish(req, 201 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( 202 "Failed HTTP requests to all targets", &req->overall_error, 1)); 203 return; 204 } 205 addr = &req->addresses->addrs[req->next_address++]; 206 GRPC_CLOSURE_INIT(&req->connected, on_connected, req, 207 grpc_schedule_on_exec_ctx); 208 grpc_arg arg = grpc_channel_arg_pointer_create( 209 (char*)GRPC_ARG_RESOURCE_QUOTA, req->resource_quota, 210 grpc_resource_quota_arg_vtable()); 211 grpc_channel_args args = {1, &arg}; 212 grpc_tcp_client_connect(&req->connected, &req->ep, req->context->pollset_set, 213 &args, addr, req->deadline); 214 } 215 216 static void on_resolved(void* arg, grpc_error* error) { 217 internal_request* req = static_cast<internal_request*>(arg); 218 if (error != GRPC_ERROR_NONE) { 219 finish(req, GRPC_ERROR_REF(error)); 220 return; 221 } 222 req->next_address = 0; 223 next_address(req, GRPC_ERROR_NONE); 224 } 225 226 static void internal_request_begin(grpc_httpcli_context* context, 227 grpc_polling_entity* pollent, 228 grpc_resource_quota* resource_quota, 229 const grpc_httpcli_request* request, 230 grpc_millis deadline, grpc_closure* on_done, 231 grpc_httpcli_response* response, 232 const char* name, grpc_slice request_text) { 233 internal_request* req = 234 static_cast<internal_request*>(gpr_malloc(sizeof(internal_request))); 235 memset(req, 0, sizeof(*req)); 236 req->request_text = request_text; 237 grpc_http_parser_init(&req->parser, GRPC_HTTP_RESPONSE, response); 238 req->on_done = on_done; 239 req->deadline = deadline; 240 req->handshaker = 241 request->handshaker ? request->handshaker : &grpc_httpcli_plaintext; 242 req->context = context; 243 req->pollent = pollent; 244 req->overall_error = GRPC_ERROR_NONE; 245 req->resource_quota = grpc_resource_quota_ref_internal(resource_quota); 246 GRPC_CLOSURE_INIT(&req->on_read, on_read, req, grpc_schedule_on_exec_ctx); 247 GRPC_CLOSURE_INIT(&req->done_write, done_write, req, 248 grpc_schedule_on_exec_ctx); 249 grpc_slice_buffer_init(&req->incoming); 250 grpc_slice_buffer_init(&req->outgoing); 251 grpc_iomgr_register_object(&req->iomgr_obj, name); 252 req->host = gpr_strdup(request->host); 253 req->ssl_host_override = gpr_strdup(request->ssl_host_override); 254 255 GPR_ASSERT(pollent); 256 grpc_polling_entity_add_to_pollset_set(req->pollent, 257 req->context->pollset_set); 258 grpc_resolve_address( 259 request->host, req->handshaker->default_port, req->context->pollset_set, 260 GRPC_CLOSURE_CREATE(on_resolved, req, grpc_schedule_on_exec_ctx), 261 &req->addresses); 262 } 263 264 void grpc_httpcli_get(grpc_httpcli_context* context, 265 grpc_polling_entity* pollent, 266 grpc_resource_quota* resource_quota, 267 const grpc_httpcli_request* request, grpc_millis deadline, 268 grpc_closure* on_done, grpc_httpcli_response* response) { 269 char* name; 270 if (g_get_override && g_get_override(request, deadline, on_done, response)) { 271 return; 272 } 273 gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->http.path); 274 internal_request_begin(context, pollent, resource_quota, request, deadline, 275 on_done, response, name, 276 grpc_httpcli_format_get_request(request)); 277 gpr_free(name); 278 } 279 280 void grpc_httpcli_post(grpc_httpcli_context* context, 281 grpc_polling_entity* pollent, 282 grpc_resource_quota* resource_quota, 283 const grpc_httpcli_request* request, 284 const char* body_bytes, size_t body_size, 285 grpc_millis deadline, grpc_closure* on_done, 286 grpc_httpcli_response* response) { 287 char* name; 288 if (g_post_override && g_post_override(request, body_bytes, body_size, 289 deadline, on_done, response)) { 290 return; 291 } 292 gpr_asprintf(&name, "HTTP:POST:%s:%s", request->host, request->http.path); 293 internal_request_begin( 294 context, pollent, resource_quota, request, deadline, on_done, response, 295 name, grpc_httpcli_format_post_request(request, body_bytes, body_size)); 296 gpr_free(name); 297 } 298 299 void grpc_httpcli_set_override(grpc_httpcli_get_override get, 300 grpc_httpcli_post_override post) { 301 g_get_override = get; 302 g_post_override = post; 303 } 304