Home | History | Annotate | Download | only in lib
      1 #pragma once
      2 
      3 /*
      4  * Copyright (C) 2017 The Android Open Source Project
      5  *
      6  * Licensed under the Apache License, Version 2.0 (the "License");
      7  * you may not use this file except in compliance with the License.
      8  * You may obtain a copy of the License at
      9  *
     10  *      http://www.apache.org/licenses/LICENSE-2.0
     11  *
     12  * Unless required by applicable law or agreed to in writing, software
     13  * distributed under the License is distributed on an "AS IS" BASIS,
     14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     15  * See the License for the specific language governing permissions and
     16  * limitations under the License.
     17  */
     18 
     19 #include <cerrno>
     20 #include <cstring>
     21 
     22 #include <sys/uio.h>
     23 
     24 #include "common/vsoc/lib/region_signaling_interface.h"
     25 #include "common/vsoc/shm/circqueue.h"
     26 
     27 // Increases the given index until it is naturally aligned for T.
     28 template <typename T>
     29 uintptr_t align(uintptr_t index) {
     30   return (index + sizeof(T) - 1) & ~(sizeof(T) - 1);
     31 }
     32 
     33 namespace vsoc {
     34 class RegionSignalingInterface;
     35 namespace layout {
     36 
     37 template <uint32_t SizeLog2>
     38 void CircularQueueBase<SizeLog2>::CopyInRange(const char* buffer_in,
     39                                               const Range& t) {
     40   size_t bytes = t.end_idx - t.start_idx;
     41   uint32_t index = t.start_idx & (BufferSize - 1);
     42   if (index + bytes <= BufferSize) {
     43     std::memcpy(buffer_ + index, buffer_in, bytes);
     44   } else {
     45     size_t part1_size = BufferSize - index;
     46     size_t part2_size = bytes - part1_size;
     47     std::memcpy(buffer_ + index, buffer_in, part1_size);
     48     std::memcpy(buffer_, buffer_in + part1_size, part2_size);
     49   }
     50 }
     51 
     52 template <uint32_t SizeLog2>
     53 void CircularQueueBase<SizeLog2>::CopyOutRange(const Range& t,
     54                                                char* buffer_out) {
     55   uint32_t index = t.start_idx & (BufferSize - 1);
     56   size_t total_size = t.end_idx - t.start_idx;
     57   if (index + total_size <= BufferSize) {
     58     std::memcpy(buffer_out, buffer_ + index, total_size);
     59   } else {
     60     uint32_t part1_size = BufferSize - index;
     61     uint32_t part2_size = total_size - part1_size;
     62     std::memcpy(buffer_out, buffer_ + index, part1_size);
     63     std::memcpy(buffer_out + part1_size, buffer_, part2_size);
     64   }
     65 }
     66 
     67 template <uint32_t SizeLog2>
     68 void CircularQueueBase<SizeLog2>::WaitForDataLocked(
     69     RegionSignalingInterface* r) {
     70   while (1) {
     71     uint32_t o_w_pub = w_pub_;
     72     // We don't have data. Wait until some appears and try again
     73     if (r_released_ != o_w_pub) {
     74       return;
     75     }
     76     lock_.Unlock();
     77     r->WaitForSignal(&w_pub_, o_w_pub);
     78     lock_.Lock();
     79   }
     80 }
     81 
     82 template <uint32_t SizeLog2>
     83 intptr_t CircularQueueBase<SizeLog2>::WriteReserveLocked(
     84     RegionSignalingInterface* r, size_t bytes, Range* t, bool non_blocking) {
     85   // Can't write more than the buffer will hold
     86   if (bytes > BufferSize) {
     87     return -ENOSPC;
     88   }
     89   while (true) {
     90     uint32_t o_w_pub = w_pub_;
     91     uint32_t o_r_release = r_released_;
     92     uint32_t bytes_in_use = o_w_pub - o_r_release;
     93     size_t available = BufferSize - bytes_in_use;
     94     if (available >= bytes) {
     95       t->start_idx = o_w_pub;
     96       t->end_idx = o_w_pub + bytes;
     97       break;
     98     }
     99     if (non_blocking) {
    100       return -EWOULDBLOCK;
    101     }
    102     // If we can't write at the moment wait for a reader to release
    103     // some bytes.
    104     lock_.Unlock();
    105     r->WaitForSignal(&r_released_, o_r_release);
    106     lock_.Lock();
    107   }
    108   return t->end_idx - t->start_idx;
    109 }
    110 
    111 template <uint32_t SizeLog2>
    112 intptr_t CircularByteQueue<SizeLog2>::Read(RegionSignalingInterface* r,
    113                                            char* buffer_out, size_t max_size) {
    114   this->lock_.Lock();
    115   this->WaitForDataLocked(r);
    116   Range t;
    117   t.start_idx = this->r_released_;
    118   t.end_idx = this->w_pub_;
    119   // The lock is still held here...
    120   // Trim the range if we got more than the reader wanted
    121   if ((t.end_idx - t.start_idx) > max_size) {
    122     t.end_idx = t.start_idx + max_size;
    123   }
    124   this->CopyOutRange(t, buffer_out);
    125   this->r_released_ = t.end_idx;
    126   this->lock_.Unlock();
    127   r->SendSignal(layout::Sides::Both, &this->r_released_);
    128   return t.end_idx - t.start_idx;
    129 }
    130 
    131 template <uint32_t SizeLog2>
    132 intptr_t CircularByteQueue<SizeLog2>::Write(RegionSignalingInterface* r,
    133                                             const char* buffer_in, size_t bytes,
    134                                             bool non_blocking) {
    135   Range range;
    136   this->lock_.Lock();
    137   intptr_t rval = this->WriteReserveLocked(r, bytes, &range, non_blocking);
    138   if (rval < 0) {
    139     this->lock_.Unlock();
    140     return rval;
    141   }
    142   this->CopyInRange(buffer_in, range);
    143   // We can't publish until all of the previous write allocations where
    144   // published.
    145   this->w_pub_ = range.end_idx;
    146   this->lock_.Unlock();
    147   r->SendSignal(layout::Sides::Both, &this->w_pub_);
    148   return bytes;
    149 }
    150 
    151 template <uint32_t SizeLog2, uint32_t MaxPacketSize>
    152 intptr_t CircularPacketQueue<SizeLog2, MaxPacketSize>::CalculateBufferedSize(
    153     size_t payload) {
    154   return align<uint32_t>(sizeof(uint32_t) + payload);
    155 }
    156 
    157 template <uint32_t SizeLog2, uint32_t MaxPacketSize>
    158 intptr_t CircularPacketQueue<SizeLog2, MaxPacketSize>::Read(
    159     RegionSignalingInterface* r, char* buffer_out, size_t max_size) {
    160   this->lock_.Lock();
    161   this->WaitForDataLocked(r);
    162   uint32_t packet_size = *reinterpret_cast<uint32_t*>(
    163       this->buffer_ + (this->r_released_ & (this->BufferSize - 1)));
    164   if (packet_size > max_size) {
    165     this->lock_.Unlock();
    166     return -ENOSPC;
    167   }
    168   Range t;
    169   t.start_idx = this->r_released_ + sizeof(uint32_t);
    170   t.end_idx = t.start_idx + packet_size;
    171   this->CopyOutRange(t, buffer_out);
    172   this->r_released_ += this->CalculateBufferedSize(packet_size);
    173   this->lock_.Unlock();
    174   r->SendSignal(layout::Sides::Both, &this->r_released_);
    175   return packet_size;
    176 }
    177 
    178 template <uint32_t SizeLog2, uint32_t MaxPacketSize>
    179 intptr_t CircularPacketQueue<SizeLog2, MaxPacketSize>::Write(
    180     RegionSignalingInterface* r, const char* buffer_in, uint32_t bytes,
    181     bool non_blocking) {
    182   iovec iov;
    183   iov.iov_base = const_cast<char *>(buffer_in);
    184   iov.iov_len = bytes;
    185   return Writev(r, &iov, 1 /* iov_count */, non_blocking);
    186 }
    187 
    188 template <uint32_t SizeLog2, uint32_t MaxPacketSize>
    189 intptr_t CircularPacketQueue<SizeLog2, MaxPacketSize>::Writev(
    190       RegionSignalingInterface *r,
    191       const iovec *iov,
    192       size_t iov_count,
    193       bool non_blocking) {
    194   size_t bytes = 0;
    195   for (size_t i = 0; i < iov_count; ++i) {
    196     bytes += iov[i].iov_len;
    197   }
    198 
    199   if (bytes > MaxPacketSize) {
    200     return -ENOSPC;
    201   }
    202 
    203   Range range;
    204   size_t buffered_size = this->CalculateBufferedSize(bytes);
    205   this->lock_.Lock();
    206   intptr_t rval =
    207       this->WriteReserveLocked(r, buffered_size, &range, non_blocking);
    208   if (rval < 0) {
    209     this->lock_.Unlock();
    210     return rval;
    211   }
    212   Range header = range;
    213   header.end_idx = header.start_idx + sizeof(uint32_t);
    214   Range payload{
    215       static_cast<uint32_t>(range.start_idx + sizeof(uint32_t)),
    216       static_cast<uint32_t>(range.start_idx + sizeof(uint32_t) + bytes)};
    217   this->CopyInRange(reinterpret_cast<const char*>(&bytes), header);
    218 
    219   Range subRange = payload;
    220   for (size_t i = 0; i < iov_count; ++i) {
    221     subRange.end_idx = subRange.start_idx + iov[i].iov_len;
    222     this->CopyInRange(static_cast<const char *>(iov[i].iov_base), subRange);
    223 
    224     subRange.start_idx = subRange.end_idx;
    225   }
    226 
    227   this->w_pub_ = range.end_idx;
    228   this->lock_.Unlock();
    229   r->SendSignal(layout::Sides::Both, &this->w_pub_);
    230   return bytes;
    231 }
    232 
    233 }  // namespace layout
    234 }  // namespace vsoc
    235