Home | History | Annotate | Download | only in ports
      1 // Copyright 2016 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/core/ports/message_queue.h"
      6 
      7 #include <algorithm>
      8 
      9 #include "base/logging.h"
     10 #include "mojo/core/ports/message_filter.h"
     11 
     12 namespace mojo {
     13 namespace core {
     14 namespace ports {
     15 
     16 // Used by std::{push,pop}_heap functions
     17 inline bool operator<(const std::unique_ptr<UserMessageEvent>& a,
     18                       const std::unique_ptr<UserMessageEvent>& b) {
     19   return a->sequence_num() > b->sequence_num();
     20 }
     21 
     22 MessageQueue::MessageQueue() : MessageQueue(kInitialSequenceNum) {}
     23 
     24 MessageQueue::MessageQueue(uint64_t next_sequence_num)
     25     : next_sequence_num_(next_sequence_num) {
     26   // The message queue is blocked waiting for a message with sequence number
     27   // equal to |next_sequence_num|.
     28 }
     29 
     30 MessageQueue::~MessageQueue() {
     31 #if DCHECK_IS_ON()
     32   size_t num_leaked_ports = 0;
     33   for (const auto& message : heap_)
     34     num_leaked_ports += message->num_ports();
     35   DVLOG_IF(1, num_leaked_ports > 0)
     36       << "Leaking " << num_leaked_ports << " ports in unreceived messages";
     37 #endif
     38 }
     39 
     40 bool MessageQueue::HasNextMessage() const {
     41   return !heap_.empty() && heap_[0]->sequence_num() == next_sequence_num_;
     42 }
     43 
     44 void MessageQueue::GetNextMessage(std::unique_ptr<UserMessageEvent>* message,
     45                                   MessageFilter* filter) {
     46   if (!HasNextMessage() || (filter && !filter->Match(*heap_[0]))) {
     47     message->reset();
     48     return;
     49   }
     50 
     51   std::pop_heap(heap_.begin(), heap_.end());
     52   *message = std::move(heap_.back());
     53   total_queued_bytes_ -= (*message)->GetSizeIfSerialized();
     54   heap_.pop_back();
     55 
     56   next_sequence_num_++;
     57 }
     58 
     59 void MessageQueue::AcceptMessage(std::unique_ptr<UserMessageEvent> message,
     60                                  bool* has_next_message) {
     61   // TODO: Handle sequence number roll-over.
     62 
     63   total_queued_bytes_ += message->GetSizeIfSerialized();
     64   heap_.emplace_back(std::move(message));
     65   std::push_heap(heap_.begin(), heap_.end());
     66 
     67   if (!signalable_) {
     68     *has_next_message = false;
     69   } else {
     70     *has_next_message = (heap_[0]->sequence_num() == next_sequence_num_);
     71   }
     72 }
     73 
     74 void MessageQueue::TakeAllMessages(
     75     std::vector<std::unique_ptr<UserMessageEvent>>* messages) {
     76   *messages = std::move(heap_);
     77   total_queued_bytes_ = 0;
     78 }
     79 
     80 }  // namespace ports
     81 }  // namespace core
     82 }  // namespace mojo
     83