Home | History | Annotate | Download | only in transport
      1 /*
      2  *
      3  * Copyright 2015 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include <grpc/support/port_platform.h>
     20 
     21 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
     22 #include "src/core/ext/transport/chttp2/transport/internal.h"
     23 
     24 #include <grpc/support/log.h>
     25 
     26 static const char* stream_list_id_string(grpc_chttp2_stream_list_id id) {
     27   switch (id) {
     28     case GRPC_CHTTP2_LIST_WRITABLE:
     29       return "writable";
     30     case GRPC_CHTTP2_LIST_WRITING:
     31       return "writing";
     32     case GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT:
     33       return "stalled_by_transport";
     34     case GRPC_CHTTP2_LIST_STALLED_BY_STREAM:
     35       return "stalled_by_stream";
     36     case GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY:
     37       return "waiting_for_concurrency";
     38     case STREAM_LIST_COUNT:
     39       GPR_UNREACHABLE_CODE(return "unknown");
     40   }
     41   GPR_UNREACHABLE_CODE(return "unknown");
     42 }
     43 
     44 grpc_core::TraceFlag grpc_trace_http2_stream_state(false, "http2_stream_state");
     45 
     46 /* core list management */
     47 
     48 static bool stream_list_empty(grpc_chttp2_transport* t,
     49                               grpc_chttp2_stream_list_id id) {
     50   return t->lists[id].head == nullptr;
     51 }
     52 
     53 static bool stream_list_pop(grpc_chttp2_transport* t,
     54                             grpc_chttp2_stream** stream,
     55                             grpc_chttp2_stream_list_id id) {
     56   grpc_chttp2_stream* s = t->lists[id].head;
     57   if (s) {
     58     grpc_chttp2_stream* new_head = s->links[id].next;
     59     GPR_ASSERT(s->included[id]);
     60     if (new_head) {
     61       t->lists[id].head = new_head;
     62       new_head->links[id].prev = nullptr;
     63     } else {
     64       t->lists[id].head = nullptr;
     65       t->lists[id].tail = nullptr;
     66     }
     67     s->included[id] = 0;
     68   }
     69   *stream = s;
     70   if (s && grpc_trace_http2_stream_state.enabled()) {
     71     gpr_log(GPR_INFO, "%p[%d][%s]: pop from %s", t, s->id,
     72             t->is_client ? "cli" : "svr", stream_list_id_string(id));
     73   }
     74   return s != nullptr;
     75 }
     76 
     77 static void stream_list_remove(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
     78                                grpc_chttp2_stream_list_id id) {
     79   GPR_ASSERT(s->included[id]);
     80   s->included[id] = 0;
     81   if (s->links[id].prev) {
     82     s->links[id].prev->links[id].next = s->links[id].next;
     83   } else {
     84     GPR_ASSERT(t->lists[id].head == s);
     85     t->lists[id].head = s->links[id].next;
     86   }
     87   if (s->links[id].next) {
     88     s->links[id].next->links[id].prev = s->links[id].prev;
     89   } else {
     90     t->lists[id].tail = s->links[id].prev;
     91   }
     92   if (grpc_trace_http2_stream_state.enabled()) {
     93     gpr_log(GPR_INFO, "%p[%d][%s]: remove from %s", t, s->id,
     94             t->is_client ? "cli" : "svr", stream_list_id_string(id));
     95   }
     96 }
     97 
     98 static bool stream_list_maybe_remove(grpc_chttp2_transport* t,
     99                                      grpc_chttp2_stream* s,
    100                                      grpc_chttp2_stream_list_id id) {
    101   if (s->included[id]) {
    102     stream_list_remove(t, s, id);
    103     return true;
    104   } else {
    105     return false;
    106   }
    107 }
    108 
    109 static void stream_list_add_tail(grpc_chttp2_transport* t,
    110                                  grpc_chttp2_stream* s,
    111                                  grpc_chttp2_stream_list_id id) {
    112   grpc_chttp2_stream* old_tail;
    113   GPR_ASSERT(!s->included[id]);
    114   old_tail = t->lists[id].tail;
    115   s->links[id].next = nullptr;
    116   s->links[id].prev = old_tail;
    117   if (old_tail) {
    118     old_tail->links[id].next = s;
    119   } else {
    120     t->lists[id].head = s;
    121   }
    122   t->lists[id].tail = s;
    123   s->included[id] = 1;
    124   if (grpc_trace_http2_stream_state.enabled()) {
    125     gpr_log(GPR_INFO, "%p[%d][%s]: add to %s", t, s->id,
    126             t->is_client ? "cli" : "svr", stream_list_id_string(id));
    127   }
    128 }
    129 
    130 static bool stream_list_add(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
    131                             grpc_chttp2_stream_list_id id) {
    132   if (s->included[id]) {
    133     return false;
    134   }
    135   stream_list_add_tail(t, s, id);
    136   return true;
    137 }
    138 
    139 /* wrappers for specializations */
    140 
    141 bool grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport* t,
    142                                           grpc_chttp2_stream* s) {
    143   GPR_ASSERT(s->id != 0);
    144   return stream_list_add(t, s, GRPC_CHTTP2_LIST_WRITABLE);
    145 }
    146 
    147 bool grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport* t,
    148                                           grpc_chttp2_stream** s) {
    149   return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WRITABLE);
    150 }
    151 
    152 bool grpc_chttp2_list_remove_writable_stream(grpc_chttp2_transport* t,
    153                                              grpc_chttp2_stream* s) {
    154   return stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_WRITABLE);
    155 }
    156 
    157 bool grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport* t,
    158                                          grpc_chttp2_stream* s) {
    159   return stream_list_add(t, s, GRPC_CHTTP2_LIST_WRITING);
    160 }
    161 
    162 bool grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport* t) {
    163   return !stream_list_empty(t, GRPC_CHTTP2_LIST_WRITING);
    164 }
    165 
    166 bool grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport* t,
    167                                          grpc_chttp2_stream** s) {
    168   return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WRITING);
    169 }
    170 
    171 void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport* t,
    172                                                   grpc_chttp2_stream* s) {
    173   stream_list_add(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
    174 }
    175 
    176 bool grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport* t,
    177                                                   grpc_chttp2_stream** s) {
    178   return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
    179 }
    180 
    181 void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport* t,
    182                                                      grpc_chttp2_stream* s) {
    183   stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
    184 }
    185 
    186 void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport* t,
    187                                                grpc_chttp2_stream* s) {
    188   GPR_ASSERT(t->flow_control->flow_control_enabled());
    189   stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
    190 }
    191 
    192 bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport* t,
    193                                                grpc_chttp2_stream** s) {
    194   return stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
    195 }
    196 
    197 void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport* t,
    198                                                   grpc_chttp2_stream* s) {
    199   stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
    200 }
    201 
    202 void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport* t,
    203                                             grpc_chttp2_stream* s) {
    204   GPR_ASSERT(t->flow_control->flow_control_enabled());
    205   stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
    206 }
    207 
    208 bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport* t,
    209                                             grpc_chttp2_stream** s) {
    210   return stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
    211 }
    212 
    213 bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport* t,
    214                                                grpc_chttp2_stream* s) {
    215   return stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
    216 }
    217