1 // Copyright (c) 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "sdk_util/thread_pool.h" 6 7 #include <pthread.h> 8 #include <semaphore.h> 9 #include <stdio.h> 10 #include <stdlib.h> 11 12 #include "sdk_util/auto_lock.h" 13 14 namespace sdk_util { 15 16 // Initializes mutex, semaphores and a pool of threads. If 0 is passed for 17 // num_threads, all work will be performed on the dispatch thread. 18 ThreadPool::ThreadPool(int num_threads) 19 : threads_(NULL), counter_(0), num_threads_(num_threads), exiting_(false), 20 user_data_(NULL), user_work_function_(NULL) { 21 if (num_threads_ > 0) { 22 int status; 23 status = sem_init(&work_sem_, 0, 0); 24 if (-1 == status) { 25 fprintf(stderr, "Failed to initialize semaphore!\n"); 26 exit(-1); 27 } 28 status = sem_init(&done_sem_, 0, 0); 29 if (-1 == status) { 30 fprintf(stderr, "Failed to initialize semaphore!\n"); 31 exit(-1); 32 } 33 threads_ = new pthread_t[num_threads_]; 34 for (int i = 0; i < num_threads_; i++) { 35 status = pthread_create(&threads_[i], NULL, WorkerThreadEntry, this); 36 if (0 != status) { 37 fprintf(stderr, "Failed to create thread!\n"); 38 exit(-1); 39 } 40 } 41 } 42 } 43 44 // Post exit request, wait for all threads to join, and cleanup. 45 ThreadPool::~ThreadPool() { 46 if (num_threads_ > 0) { 47 PostExitAndJoinAll(); 48 delete[] threads_; 49 sem_destroy(&done_sem_); 50 sem_destroy(&work_sem_); 51 } 52 } 53 54 // Setup work parameters. This function is called from the dispatch thread, 55 // when all worker threads are sleeping. 56 void ThreadPool::Setup(int counter, WorkFunction work, void *data) { 57 counter_ = counter; 58 user_work_function_ = work; 59 user_data_ = data; 60 } 61 62 // Return decremented task counter. This function 63 // can be called from multiple threads at any given time. 64 int ThreadPool::DecCounter() { 65 return AtomicAddFetch(&counter_, -1); 66 } 67 68 // Set exit flag, post and join all the threads in the pool. This function is 69 // called only from the dispatch thread, and only when all worker threads are 70 // sleeping. 71 void ThreadPool::PostExitAndJoinAll() { 72 exiting_ = true; 73 // Wake up all the sleeping worker threads. 74 for (int i = 0; i < num_threads_; ++i) 75 sem_post(&work_sem_); 76 void* retval; 77 for (int i = 0; i < num_threads_; ++i) 78 pthread_join(threads_[i], &retval); 79 } 80 81 // Main work loop - one for each worker thread. 82 void ThreadPool::WorkLoop() { 83 while (true) { 84 // Wait for work. If no work is availble, this thread will sleep here. 85 sem_wait(&work_sem_); 86 if (exiting_) break; 87 while (true) { 88 // Grab a task index to work on from the counter. 89 int task_index = DecCounter(); 90 if (task_index < 0) 91 break; 92 user_work_function_(task_index, user_data_); 93 } 94 // Post to dispatch thread work is done. 95 sem_post(&done_sem_); 96 } 97 } 98 99 // pthread entry point for a worker thread. 100 void* ThreadPool::WorkerThreadEntry(void* thiz) { 101 static_cast<ThreadPool*>(thiz)->WorkLoop(); 102 return NULL; 103 } 104 105 // DispatchMany() will dispatch a set of tasks across worker threads. 106 // Note: This function will block until all work has completed. 107 void ThreadPool::DispatchMany(int num_tasks, WorkFunction work, void* data) { 108 // On entry, all worker threads are sleeping. 109 Setup(num_tasks, work, data); 110 111 // Wake up the worker threads & have them process tasks. 112 for (int i = 0; i < num_threads_; i++) 113 sem_post(&work_sem_); 114 115 // Worker threads are now awake and busy. 116 117 // This dispatch thread will now sleep-wait for the worker threads to finish. 118 for (int i = 0; i < num_threads_; i++) 119 sem_wait(&done_sem_); 120 // On exit, all tasks are done and all worker threads are sleeping again. 121 } 122 123 // DispatchHere will dispatch all tasks on this thread. 124 void ThreadPool::DispatchHere(int num_tasks, WorkFunction work, void* data) { 125 for (int i = 0; i < num_tasks; i++) 126 work(i, data); 127 } 128 129 // Dispatch() will invoke the user supplied work function across 130 // one or more threads for each task. 131 // Note: This function will block until all work has completed. 132 void ThreadPool::Dispatch(int num_tasks, WorkFunction work, void* data) { 133 if (num_threads_ > 0) 134 DispatchMany(num_tasks, work, data); 135 else 136 DispatchHere(num_tasks, work, data); 137 } 138 139 } // namespace sdk_util 140 141