Home | History | Annotate | Download | only in ports
      1 // Copyright 2016 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/edk/system/ports/message_queue.h"
      6 
      7 #include <algorithm>
      8 
      9 #include "base/logging.h"
     10 #include "mojo/edk/system/ports/event.h"
     11 #include "mojo/edk/system/ports/message_filter.h"
     12 
     13 namespace mojo {
     14 namespace edk {
     15 namespace ports {
     16 
     17 inline uint64_t GetSequenceNum(const ScopedMessage& message) {
     18   return GetEventData<UserEventData>(*message)->sequence_num;
     19 }
     20 
     21 // Used by std::{push,pop}_heap functions
     22 inline bool operator<(const ScopedMessage& a, const ScopedMessage& b) {
     23   return GetSequenceNum(a) > GetSequenceNum(b);
     24 }
     25 
     26 MessageQueue::MessageQueue() : MessageQueue(kInitialSequenceNum) {}
     27 
     28 MessageQueue::MessageQueue(uint64_t next_sequence_num)
     29     : next_sequence_num_(next_sequence_num) {
     30   // The message queue is blocked waiting for a message with sequence number
     31   // equal to |next_sequence_num|.
     32 }
     33 
     34 MessageQueue::~MessageQueue() {
     35 #if DCHECK_IS_ON()
     36   size_t num_leaked_ports = 0;
     37   for (const auto& message : heap_)
     38     num_leaked_ports += message->num_ports();
     39   DVLOG_IF(1, num_leaked_ports > 0)
     40       << "Leaking " << num_leaked_ports << " ports in unreceived messages";
     41 #endif
     42 }
     43 
     44 bool MessageQueue::HasNextMessage() const {
     45   return !heap_.empty() && GetSequenceNum(heap_[0]) == next_sequence_num_;
     46 }
     47 
     48 void MessageQueue::GetNextMessage(ScopedMessage* message,
     49                                   MessageFilter* filter) {
     50   if (!HasNextMessage() || (filter && !filter->Match(*heap_[0].get()))) {
     51     message->reset();
     52     return;
     53   }
     54 
     55   std::pop_heap(heap_.begin(), heap_.end());
     56   *message = std::move(heap_.back());
     57   heap_.pop_back();
     58 
     59   next_sequence_num_++;
     60 }
     61 
     62 void MessageQueue::AcceptMessage(ScopedMessage message,
     63                                  bool* has_next_message) {
     64   DCHECK(GetEventHeader(*message)->type == EventType::kUser);
     65 
     66   // TODO: Handle sequence number roll-over.
     67 
     68   heap_.emplace_back(std::move(message));
     69   std::push_heap(heap_.begin(), heap_.end());
     70 
     71   if (!signalable_) {
     72     *has_next_message = false;
     73   } else {
     74     *has_next_message = (GetSequenceNum(heap_[0]) == next_sequence_num_);
     75   }
     76 }
     77 
     78 void MessageQueue::GetReferencedPorts(std::deque<PortName>* port_names) {
     79   for (const auto& message : heap_) {
     80     for (size_t i = 0; i < message->num_ports(); ++i)
     81       port_names->push_back(message->ports()[i]);
     82   }
     83 }
     84 
     85 }  // namespace ports
     86 }  // namespace edk
     87 }  // namespace mojo
     88