1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include <grpc/support/port_platform.h> 20 21 #include "src/core/lib/transport/byte_stream.h" 22 23 #include <stdlib.h> 24 #include <string.h> 25 26 #include <grpc/support/log.h> 27 28 #include "src/core/lib/gprpp/memory.h" 29 #include "src/core/lib/slice/slice_internal.h" 30 31 namespace grpc_core { 32 33 // 34 // SliceBufferByteStream 35 // 36 37 SliceBufferByteStream::SliceBufferByteStream(grpc_slice_buffer* slice_buffer, 38 uint32_t flags) 39 : ByteStream(static_cast<uint32_t>(slice_buffer->length), flags) { 40 GPR_ASSERT(slice_buffer->length <= UINT32_MAX); 41 grpc_slice_buffer_init(&backing_buffer_); 42 grpc_slice_buffer_swap(slice_buffer, &backing_buffer_); 43 } 44 45 SliceBufferByteStream::~SliceBufferByteStream() {} 46 47 void SliceBufferByteStream::Orphan() { 48 grpc_slice_buffer_destroy_internal(&backing_buffer_); 49 GRPC_ERROR_UNREF(shutdown_error_); 50 // Note: We do not actually delete the object here, since 51 // SliceBufferByteStream is usually allocated as part of a larger 52 // object and has an OrphanablePtr of itself passed down through the 53 // filter stack. 54 } 55 56 bool SliceBufferByteStream::Next(size_t max_size_hint, 57 grpc_closure* on_complete) { 58 GPR_ASSERT(cursor_ < backing_buffer_.count); 59 return true; 60 } 61 62 grpc_error* SliceBufferByteStream::Pull(grpc_slice* slice) { 63 if (shutdown_error_ != GRPC_ERROR_NONE) { 64 return GRPC_ERROR_REF(shutdown_error_); 65 } 66 GPR_ASSERT(cursor_ < backing_buffer_.count); 67 *slice = grpc_slice_ref_internal(backing_buffer_.slices[cursor_]); 68 ++cursor_; 69 return GRPC_ERROR_NONE; 70 } 71 72 void SliceBufferByteStream::Shutdown(grpc_error* error) { 73 GRPC_ERROR_UNREF(shutdown_error_); 74 shutdown_error_ = error; 75 } 76 77 // 78 // ByteStreamCache 79 // 80 81 ByteStreamCache::ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream) 82 : underlying_stream_(std::move(underlying_stream)), 83 length_(underlying_stream_->length()), 84 flags_(underlying_stream_->flags()) { 85 grpc_slice_buffer_init(&cache_buffer_); 86 } 87 88 ByteStreamCache::~ByteStreamCache() { Destroy(); } 89 90 void ByteStreamCache::Destroy() { 91 underlying_stream_.reset(); 92 if (cache_buffer_.length > 0) { 93 grpc_slice_buffer_destroy_internal(&cache_buffer_); 94 } 95 } 96 97 // 98 // ByteStreamCache::CachingByteStream 99 // 100 101 ByteStreamCache::CachingByteStream::CachingByteStream(ByteStreamCache* cache) 102 : ByteStream(cache->length_, cache->flags_), cache_(cache) {} 103 104 ByteStreamCache::CachingByteStream::~CachingByteStream() {} 105 106 void ByteStreamCache::CachingByteStream::Orphan() { 107 GRPC_ERROR_UNREF(shutdown_error_); 108 // Note: We do not actually delete the object here, since 109 // CachingByteStream is usually allocated as part of a larger 110 // object and has an OrphanablePtr of itself passed down through the 111 // filter stack. 112 } 113 114 bool ByteStreamCache::CachingByteStream::Next(size_t max_size_hint, 115 grpc_closure* on_complete) { 116 if (shutdown_error_ != GRPC_ERROR_NONE) return true; 117 if (cursor_ < cache_->cache_buffer_.count) return true; 118 GPR_ASSERT(cache_->underlying_stream_ != nullptr); 119 return cache_->underlying_stream_->Next(max_size_hint, on_complete); 120 } 121 122 grpc_error* ByteStreamCache::CachingByteStream::Pull(grpc_slice* slice) { 123 if (shutdown_error_ != GRPC_ERROR_NONE) { 124 return GRPC_ERROR_REF(shutdown_error_); 125 } 126 if (cursor_ < cache_->cache_buffer_.count) { 127 *slice = grpc_slice_ref_internal(cache_->cache_buffer_.slices[cursor_]); 128 ++cursor_; 129 offset_ += GRPC_SLICE_LENGTH(*slice); 130 return GRPC_ERROR_NONE; 131 } 132 GPR_ASSERT(cache_->underlying_stream_ != nullptr); 133 grpc_error* error = cache_->underlying_stream_->Pull(slice); 134 if (error == GRPC_ERROR_NONE) { 135 grpc_slice_buffer_add(&cache_->cache_buffer_, 136 grpc_slice_ref_internal(*slice)); 137 ++cursor_; 138 offset_ += GRPC_SLICE_LENGTH(*slice); 139 // Orphan the underlying stream if it's been drained. 140 if (offset_ == cache_->underlying_stream_->length()) { 141 cache_->underlying_stream_.reset(); 142 } 143 } 144 return error; 145 } 146 147 void ByteStreamCache::CachingByteStream::Shutdown(grpc_error* error) { 148 GRPC_ERROR_UNREF(shutdown_error_); 149 shutdown_error_ = GRPC_ERROR_REF(error); 150 if (cache_->underlying_stream_ != nullptr) { 151 cache_->underlying_stream_->Shutdown(error); 152 } 153 } 154 155 void ByteStreamCache::CachingByteStream::Reset() { 156 cursor_ = 0; 157 offset_ = 0; 158 } 159 160 } // namespace grpc_core 161