Home | History | Annotate | Download | only in transport
      1 /*
      2  *
      3  * Copyright 2015 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include <grpc/support/port_platform.h>
     20 
     21 #include "src/core/lib/transport/byte_stream.h"
     22 
     23 #include <stdlib.h>
     24 #include <string.h>
     25 
     26 #include <grpc/support/log.h>
     27 
     28 #include "src/core/lib/gprpp/memory.h"
     29 #include "src/core/lib/slice/slice_internal.h"
     30 
     31 namespace grpc_core {
     32 
     33 //
     34 // SliceBufferByteStream
     35 //
     36 
     37 SliceBufferByteStream::SliceBufferByteStream(grpc_slice_buffer* slice_buffer,
     38                                              uint32_t flags)
     39     : ByteStream(static_cast<uint32_t>(slice_buffer->length), flags) {
     40   GPR_ASSERT(slice_buffer->length <= UINT32_MAX);
     41   grpc_slice_buffer_init(&backing_buffer_);
     42   grpc_slice_buffer_swap(slice_buffer, &backing_buffer_);
     43 }
     44 
     45 SliceBufferByteStream::~SliceBufferByteStream() {}
     46 
     47 void SliceBufferByteStream::Orphan() {
     48   grpc_slice_buffer_destroy_internal(&backing_buffer_);
     49   GRPC_ERROR_UNREF(shutdown_error_);
     50   // Note: We do not actually delete the object here, since
     51   // SliceBufferByteStream is usually allocated as part of a larger
     52   // object and has an OrphanablePtr of itself passed down through the
     53   // filter stack.
     54 }
     55 
     56 bool SliceBufferByteStream::Next(size_t max_size_hint,
     57                                  grpc_closure* on_complete) {
     58   GPR_ASSERT(cursor_ < backing_buffer_.count);
     59   return true;
     60 }
     61 
     62 grpc_error* SliceBufferByteStream::Pull(grpc_slice* slice) {
     63   if (shutdown_error_ != GRPC_ERROR_NONE) {
     64     return GRPC_ERROR_REF(shutdown_error_);
     65   }
     66   GPR_ASSERT(cursor_ < backing_buffer_.count);
     67   *slice = grpc_slice_ref_internal(backing_buffer_.slices[cursor_]);
     68   ++cursor_;
     69   return GRPC_ERROR_NONE;
     70 }
     71 
     72 void SliceBufferByteStream::Shutdown(grpc_error* error) {
     73   GRPC_ERROR_UNREF(shutdown_error_);
     74   shutdown_error_ = error;
     75 }
     76 
     77 //
     78 // ByteStreamCache
     79 //
     80 
     81 ByteStreamCache::ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream)
     82     : underlying_stream_(std::move(underlying_stream)),
     83       length_(underlying_stream_->length()),
     84       flags_(underlying_stream_->flags()) {
     85   grpc_slice_buffer_init(&cache_buffer_);
     86 }
     87 
     88 ByteStreamCache::~ByteStreamCache() { Destroy(); }
     89 
     90 void ByteStreamCache::Destroy() {
     91   underlying_stream_.reset();
     92   if (cache_buffer_.length > 0) {
     93     grpc_slice_buffer_destroy_internal(&cache_buffer_);
     94   }
     95 }
     96 
     97 //
     98 // ByteStreamCache::CachingByteStream
     99 //
    100 
    101 ByteStreamCache::CachingByteStream::CachingByteStream(ByteStreamCache* cache)
    102     : ByteStream(cache->length_, cache->flags_), cache_(cache) {}
    103 
    104 ByteStreamCache::CachingByteStream::~CachingByteStream() {}
    105 
    106 void ByteStreamCache::CachingByteStream::Orphan() {
    107   GRPC_ERROR_UNREF(shutdown_error_);
    108   // Note: We do not actually delete the object here, since
    109   // CachingByteStream is usually allocated as part of a larger
    110   // object and has an OrphanablePtr of itself passed down through the
    111   // filter stack.
    112 }
    113 
    114 bool ByteStreamCache::CachingByteStream::Next(size_t max_size_hint,
    115                                               grpc_closure* on_complete) {
    116   if (shutdown_error_ != GRPC_ERROR_NONE) return true;
    117   if (cursor_ < cache_->cache_buffer_.count) return true;
    118   GPR_ASSERT(cache_->underlying_stream_ != nullptr);
    119   return cache_->underlying_stream_->Next(max_size_hint, on_complete);
    120 }
    121 
    122 grpc_error* ByteStreamCache::CachingByteStream::Pull(grpc_slice* slice) {
    123   if (shutdown_error_ != GRPC_ERROR_NONE) {
    124     return GRPC_ERROR_REF(shutdown_error_);
    125   }
    126   if (cursor_ < cache_->cache_buffer_.count) {
    127     *slice = grpc_slice_ref_internal(cache_->cache_buffer_.slices[cursor_]);
    128     ++cursor_;
    129     offset_ += GRPC_SLICE_LENGTH(*slice);
    130     return GRPC_ERROR_NONE;
    131   }
    132   GPR_ASSERT(cache_->underlying_stream_ != nullptr);
    133   grpc_error* error = cache_->underlying_stream_->Pull(slice);
    134   if (error == GRPC_ERROR_NONE) {
    135     grpc_slice_buffer_add(&cache_->cache_buffer_,
    136                           grpc_slice_ref_internal(*slice));
    137     ++cursor_;
    138     offset_ += GRPC_SLICE_LENGTH(*slice);
    139     // Orphan the underlying stream if it's been drained.
    140     if (offset_ == cache_->underlying_stream_->length()) {
    141       cache_->underlying_stream_.reset();
    142     }
    143   }
    144   return error;
    145 }
    146 
    147 void ByteStreamCache::CachingByteStream::Shutdown(grpc_error* error) {
    148   GRPC_ERROR_UNREF(shutdown_error_);
    149   shutdown_error_ = GRPC_ERROR_REF(error);
    150   if (cache_->underlying_stream_ != nullptr) {
    151     cache_->underlying_stream_->Shutdown(error);
    152   }
    153 }
    154 
    155 void ByteStreamCache::CachingByteStream::Reset() {
    156   cursor_ = 0;
    157   offset_ = 0;
    158 }
    159 
    160 }  // namespace grpc_core
    161