Home | History | Annotate | Download | only in codegen
      1 /*
      2  *
      3  * Copyright 2018 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #ifndef GRPCPP_IMPL_CODEGEN_PROTO_BUFFER_WRITER_H
     20 #define GRPCPP_IMPL_CODEGEN_PROTO_BUFFER_WRITER_H
     21 
     22 #include <type_traits>
     23 
     24 #include <grpc/impl/codegen/grpc_types.h>
     25 #include <grpc/impl/codegen/slice.h>
     26 #include <grpcpp/impl/codegen/byte_buffer.h>
     27 #include <grpcpp/impl/codegen/config_protobuf.h>
     28 #include <grpcpp/impl/codegen/core_codegen_interface.h>
     29 #include <grpcpp/impl/codegen/serialization_traits.h>
     30 #include <grpcpp/impl/codegen/status.h>
     31 
     32 /// This header provides an object that writes bytes directly into a
     33 /// grpc::ByteBuffer, via the ZeroCopyOutputStream interface
     34 
     35 namespace grpc {
     36 
     37 extern CoreCodegenInterface* g_core_codegen_interface;
     38 
     39 // Forward declaration for testing use only
     40 namespace internal {
     41 class ProtoBufferWriterPeer;
     42 }  // namespace internal
     43 
     44 const int kProtoBufferWriterMaxBufferLength = 1024 * 1024;
     45 
     46 /// This is a specialization of the protobuf class ZeroCopyOutputStream.
     47 /// The principle is to give the proto layer one buffer of bytes at a time
     48 /// that it can use to serialize the next portion of the message, with the
     49 /// option to "backup" if more buffer is given than required at the last buffer.
     50 ///
     51 /// Read more about ZeroCopyOutputStream interface here:
     52 /// https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.io.zero_copy_stream#ZeroCopyOutputStream
     53 class ProtoBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
     54  public:
     55   /// Constructor for this derived class
     56   ///
     57   /// \param[out] byte_buffer A pointer to the grpc::ByteBuffer created
     58   /// \param block_size How big are the chunks to allocate at a time
     59   /// \param total_size How many total bytes are required for this proto
     60   ProtoBufferWriter(ByteBuffer* byte_buffer, int block_size, int total_size)
     61       : block_size_(block_size),
     62         total_size_(total_size),
     63         byte_count_(0),
     64         have_backup_(false) {
     65     GPR_CODEGEN_ASSERT(!byte_buffer->Valid());
     66     /// Create an empty raw byte buffer and look at its underlying slice buffer
     67     grpc_byte_buffer* bp =
     68         g_core_codegen_interface->grpc_raw_byte_buffer_create(NULL, 0);
     69     byte_buffer->set_buffer(bp);
     70     slice_buffer_ = &bp->data.raw.slice_buffer;
     71   }
     72 
     73   ~ProtoBufferWriter() {
     74     if (have_backup_) {
     75       g_core_codegen_interface->grpc_slice_unref(backup_slice_);
     76     }
     77   }
     78 
     79   /// Give the proto library the next buffer of bytes and its size. It is
     80   /// safe for the caller to write from data[0, size - 1].
     81   bool Next(void** data, int* size) override {
     82     // Protobuf should not ask for more memory than total_size_.
     83     GPR_CODEGEN_ASSERT(byte_count_ < total_size_);
     84     // 1. Use the remaining backup slice if we have one
     85     // 2. Otherwise allocate a slice, up to the remaining length needed
     86     //    or our maximum allocation size
     87     // 3. Provide the slice start and size available
     88     // 4. Add the slice being returned to the slice buffer
     89     size_t remain = total_size_ - byte_count_;
     90     if (have_backup_) {
     91       /// If we have a backup slice, we should use it first
     92       slice_ = backup_slice_;
     93       have_backup_ = false;
     94       if (GRPC_SLICE_LENGTH(slice_) > remain) {
     95         GRPC_SLICE_SET_LENGTH(slice_, remain);
     96       }
     97     } else {
     98       // When less than a whole block is needed, only allocate that much.
     99       // But make sure the allocated slice is not inlined.
    100       size_t allocate_length =
    101           remain > static_cast<size_t>(block_size_) ? block_size_ : remain;
    102       slice_ = g_core_codegen_interface->grpc_slice_malloc(
    103           allocate_length > GRPC_SLICE_INLINED_SIZE
    104               ? allocate_length
    105               : GRPC_SLICE_INLINED_SIZE + 1);
    106     }
    107     *data = GRPC_SLICE_START_PTR(slice_);
    108     // On win x64, int is only 32bit
    109     GPR_CODEGEN_ASSERT(GRPC_SLICE_LENGTH(slice_) <= INT_MAX);
    110     byte_count_ += * size = (int)GRPC_SLICE_LENGTH(slice_);
    111     g_core_codegen_interface->grpc_slice_buffer_add(slice_buffer_, slice_);
    112     return true;
    113   }
    114 
    115   /// Backup by \a count bytes because Next returned more bytes than needed
    116   /// (only used in the last buffer). \a count must be less than or equal too
    117   /// the last buffer returned from next.
    118   void BackUp(int count) override {
    119     /// 1. Remove the partially-used last slice from the slice buffer
    120     /// 2. Split it into the needed (if any) and unneeded part
    121     /// 3. Add the needed part back to the slice buffer
    122     /// 4. Mark that we still have the remaining part (for later use/unref)
    123     GPR_CODEGEN_ASSERT(count <= static_cast<int>(GRPC_SLICE_LENGTH(slice_)));
    124     g_core_codegen_interface->grpc_slice_buffer_pop(slice_buffer_);
    125     if ((size_t)count == GRPC_SLICE_LENGTH(slice_)) {
    126       backup_slice_ = slice_;
    127     } else {
    128       backup_slice_ = g_core_codegen_interface->grpc_slice_split_tail(
    129           &slice_, GRPC_SLICE_LENGTH(slice_) - count);
    130       g_core_codegen_interface->grpc_slice_buffer_add(slice_buffer_, slice_);
    131     }
    132     // It's dangerous to keep an inlined grpc_slice as the backup slice, since
    133     // on a following Next() call, a reference will be returned to this slice
    134     // via GRPC_SLICE_START_PTR, which will not be an address held by
    135     // slice_buffer_.
    136     have_backup_ = backup_slice_.refcount != NULL;
    137     byte_count_ -= count;
    138   }
    139 
    140   /// Returns the total number of bytes written since this object was created.
    141   grpc::protobuf::int64 ByteCount() const override { return byte_count_; }
    142 
    143   // These protected members are needed to support internal optimizations.
    144   // they expose internal bits of grpc core that are NOT stable. If you have
    145   // a use case needs to use one of these functions, please send an email to
    146   // https://groups.google.com/forum/#!forum/grpc-io.
    147  protected:
    148   grpc_slice_buffer* slice_buffer() { return slice_buffer_; }
    149   void set_byte_count(int64_t byte_count) { byte_count_ = byte_count; }
    150 
    151  private:
    152   // friend for testing purposes only
    153   friend class internal::ProtoBufferWriterPeer;
    154   const int block_size_;  ///< size to alloc for each new \a grpc_slice needed
    155   const int total_size_;  ///< byte size of proto being serialized
    156   int64_t byte_count_;    ///< bytes written since this object was created
    157   grpc_slice_buffer*
    158       slice_buffer_;  ///< internal buffer of slices holding the serialized data
    159   bool have_backup_;  ///< if we are holding a backup slice or not
    160   grpc_slice backup_slice_;  ///< holds space we can still write to, if the
    161                              ///< caller has called BackUp
    162   grpc_slice slice_;         ///< current slice passed back to the caller
    163 };
    164 
    165 }  // namespace grpc
    166 
    167 #endif  // GRPCPP_IMPL_CODEGEN_PROTO_BUFFER_WRITER_H
    168