1 /* 2 This file is part of Valgrind, a dynamic binary instrumentation 3 framework. 4 5 Copyright (C) 2008-2008 Google Inc 6 opensource (at) google.com 7 8 This program is free software; you can redistribute it and/or 9 modify it under the terms of the GNU General Public License as 10 published by the Free Software Foundation; either version 2 of the 11 License, or (at your option) any later version. 12 13 This program is distributed in the hope that it will be useful, but 14 WITHOUT ANY WARRANTY; without even the implied warranty of 15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 16 General Public License for more details. 17 18 You should have received a copy of the GNU General Public License 19 along with this program; if not, write to the Free Software 20 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 21 02111-1307, USA. 22 23 The GNU General Public License is contained in the file COPYING. 24 */ 25 26 // Author: Konstantin Serebryany <opensource (at) google.com> 27 // 28 // Here we define few simple classes that wrap pthread primitives. 29 // 30 // We need this to create unit tests for helgrind (or similar tool) 31 // that will work with different threading frameworks. 32 // 33 // If one needs to test helgrind's support for another threading library, 34 // he/she can create a copy of this file and replace pthread_ calls 35 // with appropriate calls to his/her library. 36 // 37 // Note, that some of the methods defined here are annotated with 38 // ANNOTATE_* macros defined in dynamic_annotations.h. 39 // 40 // DISCLAIMER: the classes defined in this header file 41 // are NOT intended for general use -- only for unit tests. 42 // 43 44 #ifndef THREAD_WRAPPERS_PTHREAD_H 45 #define THREAD_WRAPPERS_PTHREAD_H 46 47 #include <pthread.h> 48 #include <semaphore.h> 49 #include <unistd.h> 50 #include <queue> 51 #include <stdio.h> 52 #include <limits.h> // INT_MAX 53 54 #ifdef VGO_darwin 55 #include <libkern/OSAtomic.h> 56 #define NO_BARRIER 57 #define NO_TLS 58 #endif 59 60 #include <string> 61 using namespace std; 62 63 #include <sys/time.h> 64 #include <time.h> 65 66 #include "../../drd/drd.h" 67 #define ANNOTATE_NO_OP(arg) do { } while(0) 68 #define ANNOTATE_EXPECT_RACE(addr, descr) \ 69 ANNOTATE_BENIGN_RACE_SIZED(addr, 4, "expected race") 70 static inline bool RunningOnValgrind() { return RUNNING_ON_VALGRIND; } 71 72 #include <assert.h> 73 #ifdef NDEBUG 74 # error "Pleeease, do not define NDEBUG" 75 #endif 76 #define CHECK assert 77 78 /// Set this to true if malloc() uses mutex on your platform as this may 79 /// introduce a happens-before arc for a pure happens-before race detector. 80 const bool kMallocUsesMutex = false; 81 82 /// Current time in milliseconds. 83 static inline int64_t GetCurrentTimeMillis() { 84 struct timeval now; 85 gettimeofday(&now, NULL); 86 return now.tv_sec * 1000 + now.tv_usec / 1000; 87 } 88 89 /// Copy tv to ts adding offset in milliseconds. 90 static inline void timeval2timespec(timeval *const tv, 91 timespec *ts, 92 int64_t offset_milli) { 93 const int64_t ten_9 = 1000000000LL; 94 const int64_t ten_6 = 1000000LL; 95 const int64_t ten_3 = 1000LL; 96 int64_t now_nsec = (int64_t)tv->tv_sec * ten_9; 97 now_nsec += (int64_t)tv->tv_usec * ten_3; 98 int64_t then_nsec = now_nsec + offset_milli * ten_6; 99 ts->tv_sec = then_nsec / ten_9; 100 ts->tv_nsec = then_nsec % ten_9; 101 } 102 103 104 class CondVar; 105 106 #ifndef NO_SPINLOCK 107 /// helgrind does not (yet) support spin locks, so we annotate them. 108 109 #ifndef VGO_darwin 110 class SpinLock { 111 public: 112 SpinLock() { 113 CHECK(0 == pthread_spin_init(&mu_, 0)); 114 ANNOTATE_RWLOCK_CREATE((void*)&mu_); 115 } 116 ~SpinLock() { 117 ANNOTATE_RWLOCK_DESTROY((void*)&mu_); 118 CHECK(0 == pthread_spin_destroy(&mu_)); 119 } 120 void Lock() { 121 CHECK(0 == pthread_spin_lock(&mu_)); 122 ANNOTATE_RWLOCK_ACQUIRED((void*)&mu_, 1); 123 } 124 void Unlock() { 125 ANNOTATE_RWLOCK_RELEASED((void*)&mu_, 1); 126 CHECK(0 == pthread_spin_unlock(&mu_)); 127 } 128 private: 129 pthread_spinlock_t mu_; 130 }; 131 132 #else 133 134 class SpinLock { 135 public: 136 // Mac OS X version. 137 SpinLock() : mu_(OS_SPINLOCK_INIT) { 138 ANNOTATE_RWLOCK_CREATE((void*)&mu_); 139 } 140 ~SpinLock() { 141 ANNOTATE_RWLOCK_DESTROY((void*)&mu_); 142 } 143 void Lock() { 144 OSSpinLockLock(&mu_); 145 ANNOTATE_RWLOCK_ACQUIRED((void*)&mu_, 1); 146 } 147 void Unlock() { 148 ANNOTATE_RWLOCK_RELEASED((void*)&mu_, 1); 149 OSSpinLockUnlock(&mu_); 150 } 151 private: 152 OSSpinLock mu_; 153 }; 154 #endif // VGO_darwin 155 156 #endif // NO_SPINLOCK 157 158 /// Just a boolean condition. Used by Mutex::LockWhen and similar. 159 class Condition { 160 public: 161 typedef bool (*func_t)(void*); 162 163 template <typename T> 164 Condition(bool (*func)(T*), T* arg) 165 : func_(reinterpret_cast<func_t>(func)), arg_(arg) {} 166 167 Condition(bool (*func)()) 168 : func_(reinterpret_cast<func_t>(func)), arg_(NULL) {} 169 170 bool Eval() { return func_(arg_); } 171 private: 172 func_t func_; 173 void *arg_; 174 175 }; 176 177 178 /// Wrapper for pthread_mutex_t. 179 /// 180 /// pthread_mutex_t is *not* a reader-writer lock, 181 /// so the methods like ReaderLock() aren't really reader locks. 182 /// We can not use pthread_rwlock_t because it 183 /// does not work with pthread_cond_t. 184 /// 185 /// TODO: We still need to test reader locks with this class. 186 /// Implement a mode where pthread_rwlock_t will be used 187 /// instead of pthread_mutex_t (only when not used with CondVar or LockWhen). 188 /// 189 class Mutex { 190 friend class CondVar; 191 public: 192 Mutex() { 193 CHECK(0 == pthread_mutex_init(&mu_, NULL)); 194 CHECK(0 == pthread_cond_init(&cv_, NULL)); 195 signal_at_unlock_ = true; // Always signal at Unlock to make 196 // Mutex more friendly to hybrid detectors. 197 } 198 ~Mutex() { 199 CHECK(0 == pthread_cond_destroy(&cv_)); 200 CHECK(0 == pthread_mutex_destroy(&mu_)); 201 } 202 void Lock() { CHECK(0 == pthread_mutex_lock(&mu_));} 203 bool TryLock() { return (0 == pthread_mutex_trylock(&mu_));} 204 void Unlock() { 205 if (signal_at_unlock_) { 206 CHECK(0 == pthread_cond_signal(&cv_)); 207 } 208 CHECK(0 == pthread_mutex_unlock(&mu_)); 209 } 210 void ReaderLock() { Lock(); } 211 bool ReaderTryLock() { return TryLock();} 212 void ReaderUnlock() { Unlock(); } 213 214 void LockWhen(Condition cond) { Lock(); WaitLoop(cond); } 215 void ReaderLockWhen(Condition cond) { Lock(); WaitLoop(cond); } 216 void Await(Condition cond) { WaitLoop(cond); } 217 218 bool ReaderLockWhenWithTimeout(Condition cond, int millis) 219 { Lock(); return WaitLoopWithTimeout(cond, millis); } 220 bool LockWhenWithTimeout(Condition cond, int millis) 221 { Lock(); return WaitLoopWithTimeout(cond, millis); } 222 bool AwaitWithTimeout(Condition cond, int millis) 223 { return WaitLoopWithTimeout(cond, millis); } 224 225 private: 226 227 void WaitLoop(Condition cond) { 228 signal_at_unlock_ = true; 229 while(cond.Eval() == false) { 230 pthread_cond_wait(&cv_, &mu_); 231 } 232 ANNOTATE_CONDVAR_LOCK_WAIT(&cv_, &mu_); 233 } 234 235 bool WaitLoopWithTimeout(Condition cond, int millis) { 236 struct timeval now; 237 struct timespec timeout; 238 int retcode = 0; 239 gettimeofday(&now, NULL); 240 timeval2timespec(&now, &timeout, millis); 241 242 signal_at_unlock_ = true; 243 while (cond.Eval() == false && retcode == 0) { 244 retcode = pthread_cond_timedwait(&cv_, &mu_, &timeout); 245 } 246 if(retcode == 0) { 247 ANNOTATE_CONDVAR_LOCK_WAIT(&cv_, &mu_); 248 } 249 return cond.Eval(); 250 } 251 252 // A hack. cv_ should be the first data member so that 253 // ANNOTATE_CONDVAR_WAIT(&MU, &MU) and ANNOTATE_CONDVAR_SIGNAL(&MU) works. 254 // (See also racecheck_unittest.cc) 255 pthread_cond_t cv_; 256 pthread_mutex_t mu_; 257 bool signal_at_unlock_; // Set to true if Wait was called. 258 }; 259 260 261 class MutexLock { // Scoped Mutex Locker/Unlocker 262 public: 263 MutexLock(Mutex *mu) 264 : mu_(mu) { 265 mu_->Lock(); 266 } 267 ~MutexLock() { 268 mu_->Unlock(); 269 } 270 private: 271 Mutex *mu_; 272 }; 273 274 275 /// Wrapper for pthread_cond_t. 276 class CondVar { 277 public: 278 CondVar() { CHECK(0 == pthread_cond_init(&cv_, NULL)); } 279 ~CondVar() { CHECK(0 == pthread_cond_destroy(&cv_)); } 280 void Wait(Mutex *mu) { CHECK(0 == pthread_cond_wait(&cv_, &mu->mu_)); } 281 bool WaitWithTimeout(Mutex *mu, int millis) { 282 struct timeval now; 283 struct timespec timeout; 284 gettimeofday(&now, NULL); 285 timeval2timespec(&now, &timeout, millis); 286 return 0 != pthread_cond_timedwait(&cv_, &mu->mu_, &timeout); 287 } 288 void Signal() { CHECK(0 == pthread_cond_signal(&cv_)); } 289 void SignalAll() { CHECK(0 == pthread_cond_broadcast(&cv_)); } 290 private: 291 pthread_cond_t cv_; 292 }; 293 294 295 // pthreads do not allow to use condvar with rwlock so we can't make 296 // ReaderLock method of Mutex to be the real rw-lock. 297 // So, we need a special lock class to test reader locks. 298 #define NEEDS_SEPERATE_RW_LOCK 299 class RWLock { 300 public: 301 RWLock() { CHECK(0 == pthread_rwlock_init(&mu_, NULL)); } 302 ~RWLock() { CHECK(0 == pthread_rwlock_destroy(&mu_)); } 303 void Lock() { CHECK(0 == pthread_rwlock_wrlock(&mu_)); } 304 void ReaderLock() { CHECK(0 == pthread_rwlock_rdlock(&mu_)); } 305 void Unlock() { CHECK(0 == pthread_rwlock_unlock(&mu_)); } 306 void ReaderUnlock() { CHECK(0 == pthread_rwlock_unlock(&mu_)); } 307 private: 308 pthread_cond_t dummy; // Damn, this requires some redesign... 309 pthread_rwlock_t mu_; 310 }; 311 312 class ReaderLockScoped { // Scoped RWLock Locker/Unlocker 313 public: 314 ReaderLockScoped(RWLock *mu) 315 : mu_(mu) { 316 mu_->ReaderLock(); 317 } 318 ~ReaderLockScoped() { 319 mu_->ReaderUnlock(); 320 } 321 private: 322 RWLock *mu_; 323 }; 324 325 class WriterLockScoped { // Scoped RWLock Locker/Unlocker 326 public: 327 WriterLockScoped(RWLock *mu) 328 : mu_(mu) { 329 mu_->Lock(); 330 } 331 ~WriterLockScoped() { 332 mu_->Unlock(); 333 } 334 private: 335 RWLock *mu_; 336 }; 337 338 339 340 341 /// Wrapper for pthread_create()/pthread_join(). 342 class MyThread { 343 public: 344 typedef void *(*worker_t)(void*); 345 346 MyThread(worker_t worker, void *arg = NULL, const char *name = NULL) 347 :w_(worker), arg_(arg), name_(name) {} 348 MyThread(void (*worker)(void), void *arg = NULL, const char *name = NULL) 349 :w_(reinterpret_cast<worker_t>(worker)), arg_(arg), name_(name) {} 350 MyThread(void (*worker)(void *), void *arg = NULL, const char *name = NULL) 351 :w_(reinterpret_cast<worker_t>(worker)), arg_(arg), name_(name) {} 352 353 ~MyThread(){ w_ = NULL; arg_ = NULL;} 354 void Start() { CHECK(0 == pthread_create(&t_, NULL, (worker_t)ThreadBody, this));} 355 void Join() { CHECK(0 == pthread_join(t_, NULL));} 356 pthread_t tid() const { return t_; } 357 private: 358 static void ThreadBody(MyThread *my_thread) { 359 if (my_thread->name_) { 360 ANNOTATE_THREAD_NAME(my_thread->name_); 361 } 362 my_thread->w_(my_thread->arg_); 363 } 364 pthread_t t_; 365 worker_t w_; 366 void *arg_; 367 const char *name_; 368 }; 369 370 371 /// Just a message queue. 372 class ProducerConsumerQueue { 373 public: 374 ProducerConsumerQueue(int unused) { 375 //ANNOTATE_PCQ_CREATE(this); 376 } 377 ~ProducerConsumerQueue() { 378 CHECK(q_.empty()); 379 //ANNOTATE_PCQ_DESTROY(this); 380 } 381 382 // Put. 383 void Put(void *item) { 384 mu_.Lock(); 385 q_.push(item); 386 ANNOTATE_CONDVAR_SIGNAL(&mu_); // LockWhen in Get() 387 //ANNOTATE_PCQ_PUT(this); 388 mu_.Unlock(); 389 } 390 391 // Get. 392 // Blocks if the queue is empty. 393 void *Get() { 394 mu_.LockWhen(Condition(IsQueueNotEmpty, &q_)); 395 void * item = NULL; 396 bool ok = TryGetInternal(&item); 397 CHECK(ok); 398 mu_.Unlock(); 399 return item; 400 } 401 402 // If queue is not empty, 403 // remove an element from queue, put it into *res and return true. 404 // Otherwise return false. 405 bool TryGet(void **res) { 406 mu_.Lock(); 407 bool ok = TryGetInternal(res); 408 mu_.Unlock(); 409 return ok; 410 } 411 412 private: 413 Mutex mu_; 414 std::queue<void*> q_; // protected by mu_ 415 416 // Requires mu_ 417 bool TryGetInternal(void ** item_ptr) { 418 if (q_.empty()) 419 return false; 420 *item_ptr = q_.front(); 421 q_.pop(); 422 //ANNOTATE_PCQ_GET(this); 423 return true; 424 } 425 426 static bool IsQueueNotEmpty(std::queue<void*> * queue) { 427 return !queue->empty(); 428 } 429 }; 430 431 432 433 /// Function pointer with zero, one or two parameters. 434 struct Closure { 435 typedef void (*F0)(); 436 typedef void (*F1)(void *arg1); 437 typedef void (*F2)(void *arg1, void *arg2); 438 int n_params; 439 void *f; 440 void *param1; 441 void *param2; 442 443 void Execute() { 444 if (n_params == 0) { 445 (F0(f))(); 446 } else if (n_params == 1) { 447 (F1(f))(param1); 448 } else { 449 CHECK(n_params == 2); 450 (F2(f))(param1, param2); 451 } 452 delete this; 453 } 454 }; 455 456 Closure *NewCallback(void (*f)()) { 457 Closure *res = new Closure; 458 res->n_params = 0; 459 res->f = (void*)(f); 460 res->param1 = NULL; 461 res->param2 = NULL; 462 return res; 463 } 464 465 template <class P1> 466 Closure *NewCallback(void (*f)(P1), P1 p1) { 467 CHECK(sizeof(P1) <= sizeof(void*)); 468 Closure *res = new Closure; 469 res->n_params = 1; 470 res->f = (void*)(f); 471 res->param1 = (void*)p1; 472 res->param2 = NULL; 473 return res; 474 } 475 476 template <class T, class P1, class P2> 477 Closure *NewCallback(void (*f)(P1, P2), P1 p1, P2 p2) { 478 CHECK(sizeof(P1) <= sizeof(void*)); 479 Closure *res = new Closure; 480 res->n_params = 2; 481 res->f = (void*)(f); 482 res->param1 = (void*)p1; 483 res->param2 = (void*)p2; 484 return res; 485 } 486 487 /*! A thread pool that uses ProducerConsumerQueue. 488 Usage: 489 { 490 ThreadPool pool(n_workers); 491 pool.StartWorkers(); 492 pool.Add(NewCallback(func_with_no_args)); 493 pool.Add(NewCallback(func_with_one_arg, arg)); 494 pool.Add(NewCallback(func_with_two_args, arg1, arg2)); 495 ... // more calls to pool.Add() 496 497 // the ~ThreadPool() is called: we wait workers to finish 498 // and then join all threads in the pool. 499 } 500 */ 501 class ThreadPool { 502 public: 503 //! Create n_threads threads, but do not start. 504 explicit ThreadPool(int n_threads) 505 : queue_(INT_MAX) { 506 for (int i = 0; i < n_threads; i++) { 507 MyThread *thread = new MyThread(&ThreadPool::Worker, this); 508 workers_.push_back(thread); 509 } 510 } 511 512 //! Start all threads. 513 void StartWorkers() { 514 for (size_t i = 0; i < workers_.size(); i++) { 515 workers_[i]->Start(); 516 } 517 } 518 519 //! Add a closure. 520 void Add(Closure *closure) { 521 queue_.Put(closure); 522 } 523 524 int num_threads() { return workers_.size();} 525 526 //! Wait workers to finish, then join all threads. 527 ~ThreadPool() { 528 for (size_t i = 0; i < workers_.size(); i++) { 529 Add(NULL); 530 } 531 for (size_t i = 0; i < workers_.size(); i++) { 532 workers_[i]->Join(); 533 delete workers_[i]; 534 } 535 } 536 private: 537 std::vector<MyThread*> workers_; 538 ProducerConsumerQueue queue_; 539 540 static void *Worker(void *p) { 541 ThreadPool *pool = reinterpret_cast<ThreadPool*>(p); 542 while (true) { 543 Closure *closure = reinterpret_cast<Closure*>(pool->queue_.Get()); 544 if(closure == NULL) { 545 return NULL; 546 } 547 closure->Execute(); 548 } 549 } 550 }; 551 552 #ifndef NO_BARRIER 553 /// Wrapper for pthread_barrier_t. 554 class Barrier{ 555 public: 556 explicit Barrier(int n_threads) {CHECK(0 == pthread_barrier_init(&b_, 0, n_threads));} 557 ~Barrier() {CHECK(0 == pthread_barrier_destroy(&b_));} 558 void Block() { 559 // helgrind 3.3.0 does not have an interceptor for barrier. 560 // but our current local version does. 561 // ANNOTATE_CONDVAR_SIGNAL(this); 562 pthread_barrier_wait(&b_); 563 // ANNOTATE_CONDVAR_WAIT(this, this); 564 } 565 private: 566 pthread_barrier_t b_; 567 }; 568 569 #endif // NO_BARRIER 570 571 class BlockingCounter { 572 public: 573 explicit BlockingCounter(int initial_count) : 574 count_(initial_count) {} 575 bool DecrementCount() { 576 MutexLock lock(&mu_); 577 count_--; 578 return count_ == 0; 579 } 580 void Wait() { 581 mu_.LockWhen(Condition(&IsZero, &count_)); 582 mu_.Unlock(); 583 } 584 private: 585 static bool IsZero(int *arg) { return *arg == 0; } 586 Mutex mu_; 587 int count_; 588 }; 589 590 int AtomicIncrement(volatile int *value, int increment); 591 592 #ifndef VGO_darwin 593 inline int AtomicIncrement(volatile int *value, int increment) { 594 return __sync_add_and_fetch(value, increment); 595 } 596 597 #else 598 // Mac OS X version. 599 inline int AtomicIncrement(volatile int *value, int increment) { 600 return OSAtomicAdd32(increment, value); 601 } 602 603 // TODO(timurrrr) this is a hack 604 #define memalign(A,B) malloc(B) 605 606 // TODO(timurrrr) this is a hack 607 int posix_memalign(void **out, size_t al, size_t size) { 608 *out = memalign(al, size); 609 return (*out == 0); 610 } 611 #endif // VGO_darwin 612 613 #endif // THREAD_WRAPPERS_PTHREAD_H 614 // vim:shiftwidth=2:softtabstop=2:expandtab:foldmethod=marker 615