1 // Copyright 2016 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/edk/system/ports/message_queue.h" 6 7 #include <algorithm> 8 9 #include "base/logging.h" 10 #include "mojo/edk/system/ports/event.h" 11 #include "mojo/edk/system/ports/message_filter.h" 12 13 namespace mojo { 14 namespace edk { 15 namespace ports { 16 17 inline uint64_t GetSequenceNum(const ScopedMessage& message) { 18 return GetEventData<UserEventData>(*message)->sequence_num; 19 } 20 21 // Used by std::{push,pop}_heap functions 22 inline bool operator<(const ScopedMessage& a, const ScopedMessage& b) { 23 return GetSequenceNum(a) > GetSequenceNum(b); 24 } 25 26 MessageQueue::MessageQueue() : MessageQueue(kInitialSequenceNum) {} 27 28 MessageQueue::MessageQueue(uint64_t next_sequence_num) 29 : next_sequence_num_(next_sequence_num) { 30 // The message queue is blocked waiting for a message with sequence number 31 // equal to |next_sequence_num|. 32 } 33 34 MessageQueue::~MessageQueue() { 35 #if DCHECK_IS_ON() 36 size_t num_leaked_ports = 0; 37 for (const auto& message : heap_) 38 num_leaked_ports += message->num_ports(); 39 DVLOG_IF(1, num_leaked_ports > 0) 40 << "Leaking " << num_leaked_ports << " ports in unreceived messages"; 41 #endif 42 } 43 44 bool MessageQueue::HasNextMessage() const { 45 return !heap_.empty() && GetSequenceNum(heap_[0]) == next_sequence_num_; 46 } 47 48 void MessageQueue::GetNextMessage(ScopedMessage* message, 49 MessageFilter* filter) { 50 if (!HasNextMessage() || (filter && !filter->Match(*heap_[0].get()))) { 51 message->reset(); 52 return; 53 } 54 55 std::pop_heap(heap_.begin(), heap_.end()); 56 *message = std::move(heap_.back()); 57 heap_.pop_back(); 58 59 next_sequence_num_++; 60 } 61 62 void MessageQueue::AcceptMessage(ScopedMessage message, 63 bool* has_next_message) { 64 DCHECK(GetEventHeader(*message)->type == EventType::kUser); 65 66 // TODO: Handle sequence number roll-over. 67 68 heap_.emplace_back(std::move(message)); 69 std::push_heap(heap_.begin(), heap_.end()); 70 71 if (!signalable_) { 72 *has_next_message = false; 73 } else { 74 *has_next_message = (GetSequenceNum(heap_[0]) == next_sequence_num_); 75 } 76 } 77 78 void MessageQueue::GetReferencedPorts(std::deque<PortName>* port_names) { 79 for (const auto& message : heap_) { 80 for (size_t i = 0; i < message->num_ports(); ++i) 81 port_names->push_back(message->ports()[i]); 82 } 83 } 84 85 } // namespace ports 86 } // namespace edk 87 } // namespace mojo 88