1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef SHARED_QUEUE_H 6 #define SHARED_QUEUE_H 7 8 #include <pthread.h> 9 #include <cassert> 10 #include <deque> 11 12 // This file provides a queue that uses a mutex and condition variable so that 13 // one thread can put pointers into the queue and another thread can pull items 14 // out of the queue. 15 16 // Specifies whether we want to wait for the queue. 17 enum QueueWaitingFlag { 18 kWait = 0, 19 kDontWait 20 }; 21 22 // Indicates if we got an item, did not wait, or if the queue was cancelled. 23 enum QueueGetResult { 24 kReturnedItem = 0, 25 kDidNotWait = 1, 26 kQueueWasCancelled 27 }; 28 29 // A simple scoped mutex lock. 30 // For most cases, pp::AutoLock in "ppapi/utility/threading/lock.h" can be 31 // used; LockingQueue needs to use the pthread_mutex_t directly in 32 // pthread_cond_wait so we reimplement a scoped lock here. 33 class ScopedLock { 34 public: 35 explicit ScopedLock(pthread_mutex_t* mutex) : mutex_(mutex) { 36 const int kPthreadMutexSuccess = 0; 37 if (pthread_mutex_lock(mutex_) != kPthreadMutexSuccess) { 38 mutex_ = NULL; 39 } 40 } 41 ~ScopedLock() { 42 if (mutex_ != NULL) { 43 pthread_mutex_unlock(mutex_); 44 } 45 } 46 47 private: 48 pthread_mutex_t* mutex_; // Weak reference, passed in to constructor. 49 50 // Disable copy and assign. 51 ScopedLock& operator=(const ScopedLock&); 52 ScopedLock(const ScopedLock&); 53 }; 54 55 // LockingQueue contains a collection of <T>, such as a collection of 56 // objects or pointers. The Push() method is used to add items to the 57 // queue in a thread-safe manner. The GetItem() is used to retrieve 58 // items from the queue in a thread-safe manner. 59 template <class T> class LockingQueue { 60 public: 61 LockingQueue() : quit_(false) { 62 int result = pthread_mutex_init(&queue_mutex_, NULL); 63 assert(result == 0); 64 result = pthread_cond_init(&queue_condition_var_, NULL); 65 assert(result == 0); 66 } 67 ~LockingQueue() { pthread_mutex_destroy(&queue_mutex_); } 68 69 // The producer (who instantiates the queue) calls this to tell the 70 // consumer that the queue is no longer being used. 71 void CancelQueue() { 72 ScopedLock scoped_mutex(&queue_mutex_); 73 quit_ = true; 74 // Signal the condition var so that if a thread is waiting in 75 // GetItem the thread will wake up and see that the queue has 76 // been cancelled. 77 pthread_cond_signal(&queue_condition_var_); 78 } 79 80 // The consumer calls this to see if the queue has been cancelled by 81 // the producer. If so, the thread should not call GetItem and may 82 // need to terminate -- i.e. in a case where the producer created 83 // the consumer thread. 84 bool IsCancelled() { 85 ScopedLock scoped_mutex(&queue_mutex_); 86 return quit_; 87 } 88 89 // Grabs the mutex and pushes a new item to the end of the queue if the 90 // queue is not full. Signals the condition variable so that a thread 91 // that is waiting will wake up and grab the item. 92 void Push(const T& item) { 93 ScopedLock scoped_mutex(&queue_mutex_); 94 the_queue_.push_back(item); 95 pthread_cond_signal(&queue_condition_var_); 96 } 97 98 // Tries to pop the front element from the queue; returns an enum: 99 // kReturnedItem if an item is returned in |item_ptr|, 100 // kDidNotWait if |wait| was kDontWait and the queue was empty, 101 // kQueueWasCancelled if the producer called CancelQueue(). 102 // If |wait| is kWait, GetItem will wait to return until the queue 103 // contains an item (unless the queue is cancelled). 104 QueueGetResult GetItem(T* item_ptr, QueueWaitingFlag wait) { 105 ScopedLock scoped_mutex(&queue_mutex_); 106 // Use a while loop to get an item. If the user does not want to wait, 107 // we will exit from the loop anyway, unlocking the mutex. 108 // If the user does want to wait, we will wait for pthread_cond_wait, 109 // and the while loop will check is_empty_no_locking() one more 110 // time so that a spurious wake-up of pthread_cond_wait is handled. 111 // If |quit_| has been set, break out of the loop. 112 while (!quit_ && is_empty_no_locking()) { 113 // If user doesn't want to wait, return... 114 if (kDontWait == wait) { 115 return kDidNotWait; 116 } 117 // Wait for signal to occur. 118 pthread_cond_wait(&queue_condition_var_, &queue_mutex_); 119 } 120 // Check to see if quit_ woke us up 121 if (quit_) { 122 return kQueueWasCancelled; 123 } 124 125 // At this point, the queue was either not empty or, if it was empty, 126 // we called pthread_cond_wait (which released the mutex, waited for the 127 // signal to occur, and then atomically reacquired the mutex). 128 // Thus, if we are here, the queue cannot be empty because we either 129 // had the mutex and verified it was not empty, or we waited for the 130 // producer to put an item in and signal a single thread (us). 131 T& item = the_queue_.front(); 132 *item_ptr = item; 133 the_queue_.pop_front(); 134 return kReturnedItem; 135 } 136 137 private: 138 std::deque<T> the_queue_; 139 bool quit_; 140 pthread_mutex_t queue_mutex_; 141 pthread_cond_t queue_condition_var_; 142 143 // This is used by methods that already have the lock. 144 bool is_empty_no_locking() const { return the_queue_.empty(); } 145 }; 146 147 #endif // SHARED_QUEUE_H 148