1 // Copyright 2016 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/core/ports/message_queue.h" 6 7 #include <algorithm> 8 9 #include "base/logging.h" 10 #include "mojo/core/ports/message_filter.h" 11 12 namespace mojo { 13 namespace core { 14 namespace ports { 15 16 // Used by std::{push,pop}_heap functions 17 inline bool operator<(const std::unique_ptr<UserMessageEvent>& a, 18 const std::unique_ptr<UserMessageEvent>& b) { 19 return a->sequence_num() > b->sequence_num(); 20 } 21 22 MessageQueue::MessageQueue() : MessageQueue(kInitialSequenceNum) {} 23 24 MessageQueue::MessageQueue(uint64_t next_sequence_num) 25 : next_sequence_num_(next_sequence_num) { 26 // The message queue is blocked waiting for a message with sequence number 27 // equal to |next_sequence_num|. 28 } 29 30 MessageQueue::~MessageQueue() { 31 #if DCHECK_IS_ON() 32 size_t num_leaked_ports = 0; 33 for (const auto& message : heap_) 34 num_leaked_ports += message->num_ports(); 35 DVLOG_IF(1, num_leaked_ports > 0) 36 << "Leaking " << num_leaked_ports << " ports in unreceived messages"; 37 #endif 38 } 39 40 bool MessageQueue::HasNextMessage() const { 41 return !heap_.empty() && heap_[0]->sequence_num() == next_sequence_num_; 42 } 43 44 void MessageQueue::GetNextMessage(std::unique_ptr<UserMessageEvent>* message, 45 MessageFilter* filter) { 46 if (!HasNextMessage() || (filter && !filter->Match(*heap_[0]))) { 47 message->reset(); 48 return; 49 } 50 51 std::pop_heap(heap_.begin(), heap_.end()); 52 *message = std::move(heap_.back()); 53 total_queued_bytes_ -= (*message)->GetSizeIfSerialized(); 54 heap_.pop_back(); 55 56 next_sequence_num_++; 57 } 58 59 void MessageQueue::AcceptMessage(std::unique_ptr<UserMessageEvent> message, 60 bool* has_next_message) { 61 // TODO: Handle sequence number roll-over. 62 63 total_queued_bytes_ += message->GetSizeIfSerialized(); 64 heap_.emplace_back(std::move(message)); 65 std::push_heap(heap_.begin(), heap_.end()); 66 67 if (!signalable_) { 68 *has_next_message = false; 69 } else { 70 *has_next_message = (heap_[0]->sequence_num() == next_sequence_num_); 71 } 72 } 73 74 void MessageQueue::TakeAllMessages( 75 std::vector<std::unique_ptr<UserMessageEvent>>* messages) { 76 *messages = std::move(heap_); 77 total_queued_bytes_ = 0; 78 } 79 80 } // namespace ports 81 } // namespace core 82 } // namespace mojo 83