Home | History | Annotate | Download | only in ipc
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "chromecast/media/cma/ipc/media_message_fifo.h"
      6 
      7 #include "base/atomicops.h"
      8 #include "base/bind.h"
      9 #include "base/location.h"
     10 #include "base/logging.h"
     11 #include "base/message_loop/message_loop_proxy.h"
     12 #include "chromecast/media/cma/base/cma_logging.h"
     13 #include "chromecast/media/cma/ipc/media_memory_chunk.h"
     14 #include "chromecast/media/cma/ipc/media_message.h"
     15 #include "chromecast/media/cma/ipc/media_message_type.h"
     16 
     17 namespace chromecast {
     18 namespace media {
     19 
     20 class MediaMessageFlag
     21     : public base::RefCountedThreadSafe<MediaMessageFlag> {
     22  public:
     23   // |offset| is the offset in the fifo of the media message.
     24   explicit MediaMessageFlag(size_t offset);
     25 
     26   bool IsValid() const;
     27 
     28   void Invalidate();
     29 
     30   size_t offset() const { return offset_; }
     31 
     32  private:
     33   friend class base::RefCountedThreadSafe<MediaMessageFlag>;
     34   virtual ~MediaMessageFlag();
     35 
     36   const size_t offset_;
     37   bool flag_;
     38 
     39   DISALLOW_COPY_AND_ASSIGN(MediaMessageFlag);
     40 };
     41 
     42 MediaMessageFlag::MediaMessageFlag(size_t offset)
     43   : offset_(offset),
     44     flag_(true) {
     45 }
     46 
     47 MediaMessageFlag::~MediaMessageFlag() {
     48 }
     49 
     50 bool MediaMessageFlag::IsValid() const {
     51   return flag_;
     52 }
     53 
     54 void MediaMessageFlag::Invalidate() {
     55   flag_ = false;
     56 }
     57 
     58 class FifoOwnedMemory : public MediaMemoryChunk {
     59  public:
     60   FifoOwnedMemory(void* data, size_t size,
     61                   const scoped_refptr<MediaMessageFlag>& flag,
     62                   const base::Closure& release_msg_cb);
     63   virtual ~FifoOwnedMemory();
     64 
     65   // MediaMemoryChunk implementation.
     66   virtual void* data() const OVERRIDE { return data_; }
     67   virtual size_t size() const OVERRIDE { return size_; }
     68   virtual bool valid() const OVERRIDE { return flag_->IsValid(); }
     69 
     70  private:
     71   scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
     72   base::Closure release_msg_cb_;
     73 
     74   void* const data_;
     75   const size_t size_;
     76   scoped_refptr<MediaMessageFlag> flag_;
     77 
     78   DISALLOW_COPY_AND_ASSIGN(FifoOwnedMemory);
     79 };
     80 
     81 FifoOwnedMemory::FifoOwnedMemory(
     82     void* data, size_t size,
     83     const scoped_refptr<MediaMessageFlag>& flag,
     84     const base::Closure& release_msg_cb)
     85   : task_runner_(base::MessageLoopProxy::current()),
     86     release_msg_cb_(release_msg_cb),
     87     data_(data),
     88     size_(size),
     89     flag_(flag) {
     90 }
     91 
     92 FifoOwnedMemory::~FifoOwnedMemory() {
     93   // Release the flag before notifying that the message has been released.
     94   flag_ = scoped_refptr<MediaMessageFlag>();
     95   if (!release_msg_cb_.is_null()) {
     96     if (task_runner_->BelongsToCurrentThread()) {
     97       release_msg_cb_.Run();
     98     } else {
     99       task_runner_->PostTask(FROM_HERE, release_msg_cb_);
    100     }
    101   }
    102 }
    103 
    104 MediaMessageFifo::MediaMessageFifo(
    105     scoped_ptr<MediaMemoryChunk> mem, bool init)
    106   : mem_(mem.Pass()),
    107     weak_factory_(this) {
    108   CHECK_EQ(reinterpret_cast<uintptr_t>(mem_->data()) % ALIGNOF(Descriptor),
    109            0u);
    110   CHECK_GE(mem_->size(), sizeof(Descriptor));
    111   Descriptor* desc = static_cast<Descriptor*>(mem_->data());
    112   base_ = static_cast<void*>(&desc->first_item);
    113 
    114   // TODO(damienv): remove cast when atomic size_t is defined in Chrome.
    115   // Currently, the sign differs.
    116   rd_offset_ = reinterpret_cast<AtomicSize*>(&(desc->rd_offset));
    117   wr_offset_ = reinterpret_cast<AtomicSize*>(&(desc->wr_offset));
    118 
    119   size_t max_size = mem_->size() -
    120       (static_cast<char*>(base_) - static_cast<char*>(mem_->data()));
    121   if (init) {
    122     size_ = max_size;
    123     desc->size = size_;
    124     internal_rd_offset_ = 0;
    125     internal_wr_offset_ = 0;
    126     base::subtle::Acquire_Store(rd_offset_, 0);
    127     base::subtle::Acquire_Store(wr_offset_, 0);
    128   } else {
    129     size_ = desc->size;
    130     CHECK_LE(size_, max_size);
    131     internal_rd_offset_ = current_rd_offset();
    132     internal_wr_offset_ = current_wr_offset();
    133   }
    134   CMALOG(kLogControl)
    135       << "MediaMessageFifo:" << " init=" << init << " size=" << size_;
    136   CHECK_GT(size_, 0) << size_;
    137 
    138   weak_this_ = weak_factory_.GetWeakPtr();
    139   thread_checker_.DetachFromThread();
    140 }
    141 
    142 MediaMessageFifo::~MediaMessageFifo() {
    143   DCHECK(thread_checker_.CalledOnValidThread());
    144 }
    145 
    146 void MediaMessageFifo::ObserveReadActivity(
    147     const base::Closure& read_event_cb) {
    148   read_event_cb_ = read_event_cb;
    149 }
    150 
    151 void MediaMessageFifo::ObserveWriteActivity(
    152     const base::Closure& write_event_cb) {
    153   write_event_cb_ = write_event_cb;
    154 }
    155 
    156 scoped_ptr<MediaMemoryChunk> MediaMessageFifo::ReserveMemory(
    157     size_t size_to_reserve) {
    158   DCHECK(thread_checker_.CalledOnValidThread());
    159 
    160   // Capture first both the read and write offsets.
    161   // and exit right away if not enough free space.
    162   size_t wr_offset = internal_wr_offset();
    163   size_t rd_offset = current_rd_offset();
    164   size_t allocated_size = (size_ + wr_offset - rd_offset) % size_;
    165   size_t free_size = size_ - 1 - allocated_size;
    166   if (free_size < size_to_reserve)
    167     return scoped_ptr<MediaMemoryChunk>();
    168   CHECK_LE(MediaMessage::minimum_msg_size(), size_to_reserve);
    169 
    170   // Note: in the next 2 conditions, we have:
    171   // trailing_byte_count < size_to_reserve
    172   // and since at this stage: size_to_reserve <= free_size
    173   // we also have trailing_byte_count <= free_size
    174   // which means that all the trailing bytes are free space in the fifo.
    175   size_t trailing_byte_count = size_ - wr_offset;
    176   if (trailing_byte_count < MediaMessage::minimum_msg_size()) {
    177     // If there is no space to even write the smallest message,
    178     // skip the trailing bytes and come back to the beginning of the fifo.
    179     // (no way to insert a padding message).
    180     if (free_size < trailing_byte_count)
    181       return scoped_ptr<MediaMemoryChunk>();
    182     wr_offset = 0;
    183     CommitInternalWrite(wr_offset);
    184 
    185   } else if (trailing_byte_count < size_to_reserve) {
    186     // At this point, we know we have at least the space to write a message.
    187     // However, to avoid splitting a message, a padding message is needed.
    188     scoped_ptr<MediaMemoryChunk> mem(
    189         ReserveMemoryNoCheck(trailing_byte_count));
    190     scoped_ptr<MediaMessage> padding_message(
    191         MediaMessage::CreateMessage(PaddingMediaMsg, mem.Pass()));
    192   }
    193 
    194   // Recalculate the free size and exit if not enough free space.
    195   wr_offset = internal_wr_offset();
    196   allocated_size = (size_ + wr_offset - rd_offset) % size_;
    197   free_size = size_ - 1 - allocated_size;
    198   if (free_size < size_to_reserve)
    199     return scoped_ptr<MediaMemoryChunk>();
    200 
    201   return ReserveMemoryNoCheck(size_to_reserve);
    202 }
    203 
    204 scoped_ptr<MediaMessage> MediaMessageFifo::Pop() {
    205   DCHECK(thread_checker_.CalledOnValidThread());
    206 
    207   // Capture the read and write offsets.
    208   size_t rd_offset = internal_rd_offset();
    209   size_t wr_offset = current_wr_offset();
    210   size_t allocated_size = (size_ + wr_offset - rd_offset) % size_;
    211 
    212   if (allocated_size < MediaMessage::minimum_msg_size())
    213     return scoped_ptr<MediaMessage>();
    214 
    215   size_t trailing_byte_count = size_ - rd_offset;
    216   if (trailing_byte_count < MediaMessage::minimum_msg_size()) {
    217     // If there is no space to even have the smallest message,
    218     // skip the trailing bytes and come back to the beginning of the fifo.
    219     // Note: all the trailing bytes correspond to allocated bytes since:
    220     // trailing_byte_count < MediaMessage::minimum_msg_size() <= allocated_size
    221     rd_offset = 0;
    222     allocated_size -= trailing_byte_count;
    223     trailing_byte_count = size_;
    224     CommitInternalRead(rd_offset);
    225   }
    226 
    227   // The message should not be longer than the allocated size
    228   // but since a message is a contiguous area of memory, it should also be
    229   // smaller than |trailing_byte_count|.
    230   size_t max_msg_size = std::min(allocated_size, trailing_byte_count);
    231   if (max_msg_size < MediaMessage::minimum_msg_size())
    232     return scoped_ptr<MediaMessage>();
    233   void* msg_src = static_cast<uint8*>(base_) + rd_offset;
    234 
    235   // Create a flag to protect the serialized structure of the message
    236   // from being overwritten.
    237   // The serialized structure starts at offset |rd_offset|.
    238   scoped_refptr<MediaMessageFlag> rd_flag(new MediaMessageFlag(rd_offset));
    239   rd_flags_.push_back(rd_flag);
    240   scoped_ptr<MediaMemoryChunk> mem(
    241       new FifoOwnedMemory(
    242           msg_src, max_msg_size, rd_flag,
    243           base::Bind(&MediaMessageFifo::OnRdMemoryReleased, weak_this_)));
    244 
    245   // Create the message which wraps its the serialized structure.
    246   scoped_ptr<MediaMessage> message(MediaMessage::MapMessage(mem.Pass()));
    247   CHECK(message);
    248 
    249   // Update the internal read pointer.
    250   rd_offset = (rd_offset + message->size()) % size_;
    251   CommitInternalRead(rd_offset);
    252 
    253   return message.Pass();
    254 }
    255 
    256 void MediaMessageFifo::Flush() {
    257   DCHECK(thread_checker_.CalledOnValidThread());
    258 
    259   size_t wr_offset = current_wr_offset();
    260 
    261   // Invalidate every memory region before flushing.
    262   while (!rd_flags_.empty()) {
    263     CMALOG(kLogControl) << "Invalidate flag";
    264     rd_flags_.front()->Invalidate();
    265     rd_flags_.pop_front();
    266   }
    267 
    268   // Flush by setting the read pointer to the value of the write pointer.
    269   // Update first the internal read pointer then the public one.
    270   CommitInternalRead(wr_offset);
    271   CommitRead(wr_offset);
    272 }
    273 
    274 scoped_ptr<MediaMemoryChunk> MediaMessageFifo::ReserveMemoryNoCheck(
    275     size_t size_to_reserve) {
    276   size_t wr_offset = internal_wr_offset();
    277 
    278   // Memory block corresponding to the serialized structure of the message.
    279   void* msg_start = static_cast<uint8*>(base_) + wr_offset;
    280   scoped_refptr<MediaMessageFlag> wr_flag(new MediaMessageFlag(wr_offset));
    281   wr_flags_.push_back(wr_flag);
    282   scoped_ptr<MediaMemoryChunk> mem(
    283       new FifoOwnedMemory(
    284           msg_start, size_to_reserve, wr_flag,
    285           base::Bind(&MediaMessageFifo::OnWrMemoryReleased, weak_this_)));
    286 
    287   // Update the internal write pointer.
    288   wr_offset = (wr_offset + size_to_reserve) % size_;
    289   CommitInternalWrite(wr_offset);
    290 
    291   return mem.Pass();
    292 }
    293 
    294 void MediaMessageFifo::OnWrMemoryReleased() {
    295   DCHECK(thread_checker_.CalledOnValidThread());
    296 
    297   if (wr_flags_.empty()) {
    298     // Sanity check: when there is no protected memory area,
    299     // the external write offset has no reason to be different from
    300     // the internal write offset.
    301     DCHECK_EQ(current_wr_offset(), internal_wr_offset());
    302     return;
    303   }
    304 
    305   // Update the external write offset.
    306   while (!wr_flags_.empty() &&
    307          (!wr_flags_.front()->IsValid() || wr_flags_.front()->HasOneRef())) {
    308     // TODO(damienv): Could add a sanity check to make sure the offset is
    309     // between the external write offset and the read offset (not included).
    310     wr_flags_.pop_front();
    311   }
    312 
    313   // Update the read offset to the first locked memory area
    314   // or to the internal read pointer if nothing prevents it.
    315   size_t external_wr_offset = internal_wr_offset();
    316   if (!wr_flags_.empty())
    317     external_wr_offset = wr_flags_.front()->offset();
    318   CommitWrite(external_wr_offset);
    319 }
    320 
    321 void MediaMessageFifo::OnRdMemoryReleased() {
    322   DCHECK(thread_checker_.CalledOnValidThread());
    323 
    324   if (rd_flags_.empty()) {
    325     // Sanity check: when there is no protected memory area,
    326     // the external read offset has no reason to be different from
    327     // the internal read offset.
    328     DCHECK_EQ(current_rd_offset(), internal_rd_offset());
    329     return;
    330   }
    331 
    332   // Update the external read offset.
    333   while (!rd_flags_.empty() &&
    334          (!rd_flags_.front()->IsValid() || rd_flags_.front()->HasOneRef())) {
    335     // TODO(damienv): Could add a sanity check to make sure the offset is
    336     // between the external read offset and the write offset.
    337     rd_flags_.pop_front();
    338   }
    339 
    340   // Update the read offset to the first locked memory area
    341   // or to the internal read pointer if nothing prevents it.
    342   size_t external_rd_offset = internal_rd_offset();
    343   if (!rd_flags_.empty())
    344     external_rd_offset = rd_flags_.front()->offset();
    345   CommitRead(external_rd_offset);
    346 }
    347 
    348 size_t MediaMessageFifo::current_rd_offset() const {
    349   DCHECK_EQ(sizeof(size_t), sizeof(AtomicSize));
    350   size_t rd_offset = base::subtle::Acquire_Load(rd_offset_);
    351   CHECK_LT(rd_offset, size_);
    352   return rd_offset;
    353 }
    354 
    355 size_t MediaMessageFifo::current_wr_offset() const {
    356   DCHECK_EQ(sizeof(size_t), sizeof(AtomicSize));
    357 
    358   // When the fifo consumer acquires the write offset,
    359   // we have to make sure that any possible following reads are actually
    360   // returning results at least inline with the memory snapshot taken
    361   // when the write offset was sampled.
    362   // That's why an Acquire_Load is used here.
    363   size_t wr_offset = base::subtle::Acquire_Load(wr_offset_);
    364   CHECK_LT(wr_offset, size_);
    365   return wr_offset;
    366 }
    367 
    368 void MediaMessageFifo::CommitRead(size_t new_rd_offset) {
    369   // Add a memory fence to ensure the message content is completely read
    370   // before updating the read offset.
    371   base::subtle::Release_Store(rd_offset_, new_rd_offset);
    372 
    373   // Make sure the read pointer has been updated before sending a notification.
    374   if (!read_event_cb_.is_null()) {
    375     base::subtle::MemoryBarrier();
    376     read_event_cb_.Run();
    377   }
    378 }
    379 
    380 void MediaMessageFifo::CommitWrite(size_t new_wr_offset) {
    381   // Add a memory fence to ensure the message content is written
    382   // before updating the write offset.
    383   base::subtle::Release_Store(wr_offset_, new_wr_offset);
    384 
    385   // Make sure the write pointer has been updated before sending a notification.
    386   if (!write_event_cb_.is_null()) {
    387     base::subtle::MemoryBarrier();
    388     write_event_cb_.Run();
    389   }
    390 }
    391 
    392 void MediaMessageFifo::CommitInternalRead(size_t new_rd_offset) {
    393   internal_rd_offset_ = new_rd_offset;
    394 }
    395 
    396 void MediaMessageFifo::CommitInternalWrite(size_t new_wr_offset) {
    397   internal_wr_offset_ = new_wr_offset;
    398 }
    399 
    400 }  // namespace media
    401 }  // namespace chromecast
    402