Home | History | Annotate | Download | only in tests
      1 /*
      2   This file is part of Valgrind, a dynamic binary instrumentation
      3   framework.
      4 
      5   Copyright (C) 2008-2008 Google Inc
      6      opensource (at) google.com
      7 
      8   This program is free software; you can redistribute it and/or
      9   modify it under the terms of the GNU General Public License as
     10   published by the Free Software Foundation; either version 2 of the
     11   License, or (at your option) any later version.
     12 
     13   This program is distributed in the hope that it will be useful, but
     14   WITHOUT ANY WARRANTY; without even the implied warranty of
     15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     16   General Public License for more details.
     17 
     18   You should have received a copy of the GNU General Public License
     19   along with this program; if not, write to the Free Software
     20   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
     21   02111-1307, USA.
     22 
     23   The GNU General Public License is contained in the file COPYING.
     24 */
     25 
     26 // Author: Konstantin Serebryany <opensource (at) google.com>
     27 //
     28 // Here we define few simple classes that wrap pthread primitives.
     29 //
     30 // We need this to create unit tests for helgrind (or similar tool)
     31 // that will work with different threading frameworks.
     32 //
     33 // If one needs to test helgrind's support for another threading library,
     34 // he/she can create a copy of this file and replace pthread_ calls
     35 // with appropriate calls to his/her library.
     36 //
     37 // Note, that some of the methods defined here are annotated with
     38 // ANNOTATE_* macros defined in dynamic_annotations.h.
     39 //
     40 // DISCLAIMER: the classes defined in this header file
     41 // are NOT intended for general use -- only for unit tests.
     42 //
     43 
     44 #ifndef THREAD_WRAPPERS_PTHREAD_H
     45 #define THREAD_WRAPPERS_PTHREAD_H
     46 
     47 #include <pthread.h>
     48 #include <semaphore.h>
     49 #include <unistd.h>
     50 #include <queue>
     51 #include <stdio.h>
     52 #include <limits.h>   // INT_MAX
     53 
     54 #ifdef VGO_darwin
     55 #include <libkern/OSAtomic.h>
     56 #define NO_BARRIER
     57 #define NO_TLS
     58 #endif
     59 
     60 #include <string>
     61 using namespace std;
     62 
     63 #include <sys/time.h>
     64 #include <time.h>
     65 
     66 #include "../../drd/drd.h"
     67 #define ANNOTATE_NO_OP(arg) do { } while(0)
     68 #define ANNOTATE_EXPECT_RACE(addr, descr)                \
     69     ANNOTATE_BENIGN_RACE_SIZED(addr, 4, "expected race")
     70 static inline bool RunningOnValgrind() { return RUNNING_ON_VALGRIND; }
     71 
     72 #include <assert.h>
     73 #ifdef NDEBUG
     74 # error "Pleeease, do not define NDEBUG"
     75 #endif
     76 #define CHECK assert
     77 
     78 /// Set this to true if malloc() uses mutex on your platform as this may
     79 /// introduce a happens-before arc for a pure happens-before race detector.
     80 const bool kMallocUsesMutex = false;
     81 
     82 /// Current time in milliseconds.
     83 static inline int64_t GetCurrentTimeMillis() {
     84   struct timeval now;
     85   gettimeofday(&now, NULL);
     86   return now.tv_sec * 1000 + now.tv_usec / 1000;
     87 }
     88 
     89 /// Copy tv to ts adding offset in milliseconds.
     90 static inline void timeval2timespec(timeval *const tv,
     91                                      timespec *ts,
     92                                      int64_t offset_milli) {
     93   const int64_t ten_9 = 1000000000LL;
     94   const int64_t ten_6 = 1000000LL;
     95   const int64_t ten_3 = 1000LL;
     96   int64_t now_nsec = (int64_t)tv->tv_sec * ten_9;
     97   now_nsec += (int64_t)tv->tv_usec * ten_3;
     98   int64_t then_nsec = now_nsec + offset_milli * ten_6;
     99   ts->tv_sec  = then_nsec / ten_9;
    100   ts->tv_nsec = then_nsec % ten_9;
    101 }
    102 
    103 
    104 class CondVar;
    105 
    106 #ifndef NO_SPINLOCK
    107 /// helgrind does not (yet) support spin locks, so we annotate them.
    108 
    109 #ifndef VGO_darwin
    110 class SpinLock {
    111  public:
    112   SpinLock() {
    113     CHECK(0 == pthread_spin_init(&mu_, 0));
    114     ANNOTATE_RWLOCK_CREATE((void*)&mu_);
    115   }
    116   ~SpinLock() {
    117     ANNOTATE_RWLOCK_DESTROY((void*)&mu_);
    118     CHECK(0 == pthread_spin_destroy(&mu_));
    119   }
    120   void Lock() {
    121     CHECK(0 == pthread_spin_lock(&mu_));
    122     ANNOTATE_RWLOCK_ACQUIRED((void*)&mu_, 1);
    123   }
    124   void Unlock() {
    125     ANNOTATE_RWLOCK_RELEASED((void*)&mu_, 1);
    126     CHECK(0 == pthread_spin_unlock(&mu_));
    127   }
    128  private:
    129   pthread_spinlock_t mu_;
    130 };
    131 
    132 #else
    133 
    134 class SpinLock {
    135  public:
    136   // Mac OS X version.
    137   SpinLock() : mu_(OS_SPINLOCK_INIT) {
    138     ANNOTATE_RWLOCK_CREATE((void*)&mu_);
    139   }
    140   ~SpinLock() {
    141     ANNOTATE_RWLOCK_DESTROY((void*)&mu_);
    142   }
    143   void Lock() {
    144     OSSpinLockLock(&mu_);
    145     ANNOTATE_RWLOCK_ACQUIRED((void*)&mu_, 1);
    146   }
    147   void Unlock() {
    148     ANNOTATE_RWLOCK_RELEASED((void*)&mu_, 1);
    149     OSSpinLockUnlock(&mu_);
    150   }
    151  private:
    152   OSSpinLock mu_;
    153 };
    154 #endif // VGO_darwin
    155 
    156 #endif // NO_SPINLOCK
    157 
    158 /// Just a boolean condition. Used by Mutex::LockWhen and similar.
    159 class Condition {
    160  public:
    161   typedef bool (*func_t)(void*);
    162 
    163   template <typename T>
    164   Condition(bool (*func)(T*), T* arg)
    165   : func_(reinterpret_cast<func_t>(func)), arg_(arg) {}
    166 
    167   Condition(bool (*func)())
    168   : func_(reinterpret_cast<func_t>(func)), arg_(NULL) {}
    169 
    170   bool Eval() { return func_(arg_); }
    171  private:
    172   func_t func_;
    173   void *arg_;
    174 
    175 };
    176 
    177 
    178 /// Wrapper for pthread_mutex_t.
    179 ///
    180 /// pthread_mutex_t is *not* a reader-writer lock,
    181 /// so the methods like ReaderLock() aren't really reader locks.
    182 /// We can not use pthread_rwlock_t because it
    183 /// does not work with pthread_cond_t.
    184 ///
    185 /// TODO: We still need to test reader locks with this class.
    186 /// Implement a mode where pthread_rwlock_t will be used
    187 /// instead of pthread_mutex_t (only when not used with CondVar or LockWhen).
    188 ///
    189 class Mutex {
    190   friend class CondVar;
    191  public:
    192   Mutex() {
    193     CHECK(0 == pthread_mutex_init(&mu_, NULL));
    194     CHECK(0 == pthread_cond_init(&cv_, NULL));
    195     signal_at_unlock_ = true;  // Always signal at Unlock to make
    196                                // Mutex more friendly to hybrid detectors.
    197   }
    198   ~Mutex() {
    199     CHECK(0 == pthread_cond_destroy(&cv_));
    200     CHECK(0 == pthread_mutex_destroy(&mu_));
    201   }
    202   void Lock()          { CHECK(0 == pthread_mutex_lock(&mu_));}
    203   bool TryLock()       { return (0 == pthread_mutex_trylock(&mu_));}
    204   void Unlock() {
    205     if (signal_at_unlock_) {
    206       CHECK(0 == pthread_cond_signal(&cv_));
    207     }
    208     CHECK(0 == pthread_mutex_unlock(&mu_));
    209   }
    210   void ReaderLock()    { Lock(); }
    211   bool ReaderTryLock() { return TryLock();}
    212   void ReaderUnlock()  { Unlock(); }
    213 
    214   void LockWhen(Condition cond)            { Lock(); WaitLoop(cond); }
    215   void ReaderLockWhen(Condition cond)      { Lock(); WaitLoop(cond); }
    216   void Await(Condition cond)               { WaitLoop(cond); }
    217 
    218   bool ReaderLockWhenWithTimeout(Condition cond, int millis)
    219     { Lock(); return WaitLoopWithTimeout(cond, millis); }
    220   bool LockWhenWithTimeout(Condition cond, int millis)
    221     { Lock(); return WaitLoopWithTimeout(cond, millis); }
    222   bool AwaitWithTimeout(Condition cond, int millis)
    223     { return WaitLoopWithTimeout(cond, millis); }
    224 
    225  private:
    226 
    227   void WaitLoop(Condition cond) {
    228     signal_at_unlock_ = true;
    229     while(cond.Eval() == false) {
    230       pthread_cond_wait(&cv_, &mu_);
    231     }
    232     ANNOTATE_CONDVAR_LOCK_WAIT(&cv_, &mu_);
    233   }
    234 
    235   bool WaitLoopWithTimeout(Condition cond, int millis) {
    236     struct timeval now;
    237     struct timespec timeout;
    238     int retcode = 0;
    239     gettimeofday(&now, NULL);
    240     timeval2timespec(&now, &timeout, millis);
    241 
    242     signal_at_unlock_ = true;
    243     while (cond.Eval() == false && retcode == 0) {
    244       retcode = pthread_cond_timedwait(&cv_, &mu_, &timeout);
    245     }
    246     if(retcode == 0) {
    247       ANNOTATE_CONDVAR_LOCK_WAIT(&cv_, &mu_);
    248     }
    249     return cond.Eval();
    250   }
    251 
    252   // A hack. cv_ should be the first data member so that
    253   // ANNOTATE_CONDVAR_WAIT(&MU, &MU) and ANNOTATE_CONDVAR_SIGNAL(&MU) works.
    254   // (See also racecheck_unittest.cc)
    255   pthread_cond_t  cv_;
    256   pthread_mutex_t mu_;
    257   bool            signal_at_unlock_;  // Set to true if Wait was called.
    258 };
    259 
    260 
    261 class MutexLock {  // Scoped Mutex Locker/Unlocker
    262  public:
    263   MutexLock(Mutex *mu)
    264     : mu_(mu) {
    265     mu_->Lock();
    266   }
    267   ~MutexLock() {
    268     mu_->Unlock();
    269   }
    270  private:
    271   Mutex *mu_;
    272 };
    273 
    274 
    275 /// Wrapper for pthread_cond_t.
    276 class CondVar {
    277  public:
    278   CondVar()   { CHECK(0 == pthread_cond_init(&cv_, NULL)); }
    279   ~CondVar()  { CHECK(0 == pthread_cond_destroy(&cv_)); }
    280   void Wait(Mutex *mu) { CHECK(0 == pthread_cond_wait(&cv_, &mu->mu_)); }
    281   bool WaitWithTimeout(Mutex *mu, int millis) {
    282     struct timeval now;
    283     struct timespec timeout;
    284     gettimeofday(&now, NULL);
    285     timeval2timespec(&now, &timeout, millis);
    286     return 0 != pthread_cond_timedwait(&cv_, &mu->mu_, &timeout);
    287   }
    288   void Signal() { CHECK(0 == pthread_cond_signal(&cv_)); }
    289   void SignalAll() { CHECK(0 == pthread_cond_broadcast(&cv_)); }
    290  private:
    291   pthread_cond_t cv_;
    292 };
    293 
    294 
    295 // pthreads do not allow to use condvar with rwlock so we can't make
    296 // ReaderLock method of Mutex to be the real rw-lock.
    297 // So, we need a special lock class to test reader locks.
    298 #define NEEDS_SEPERATE_RW_LOCK
    299 class RWLock {
    300  public:
    301   RWLock() { CHECK(0 == pthread_rwlock_init(&mu_, NULL)); }
    302   ~RWLock() { CHECK(0 == pthread_rwlock_destroy(&mu_)); }
    303   void Lock() { CHECK(0 == pthread_rwlock_wrlock(&mu_)); }
    304   void ReaderLock() { CHECK(0 == pthread_rwlock_rdlock(&mu_)); }
    305   void Unlock() { CHECK(0 == pthread_rwlock_unlock(&mu_)); }
    306   void ReaderUnlock() { CHECK(0 == pthread_rwlock_unlock(&mu_)); }
    307  private:
    308   pthread_cond_t dummy; // Damn, this requires some redesign...
    309   pthread_rwlock_t mu_;
    310 };
    311 
    312 class ReaderLockScoped {  // Scoped RWLock Locker/Unlocker
    313  public:
    314   ReaderLockScoped(RWLock *mu)
    315     : mu_(mu) {
    316     mu_->ReaderLock();
    317   }
    318   ~ReaderLockScoped() {
    319     mu_->ReaderUnlock();
    320   }
    321  private:
    322   RWLock *mu_;
    323 };
    324 
    325 class WriterLockScoped {  // Scoped RWLock Locker/Unlocker
    326  public:
    327   WriterLockScoped(RWLock *mu)
    328     : mu_(mu) {
    329     mu_->Lock();
    330   }
    331   ~WriterLockScoped() {
    332     mu_->Unlock();
    333   }
    334  private:
    335   RWLock *mu_;
    336 };
    337 
    338 
    339 
    340 
    341 /// Wrapper for pthread_create()/pthread_join().
    342 class MyThread {
    343  public:
    344   typedef void *(*worker_t)(void*);
    345 
    346   MyThread(worker_t worker, void *arg = NULL, const char *name = NULL)
    347       :w_(worker), arg_(arg), name_(name) {}
    348   MyThread(void (*worker)(void), void *arg = NULL, const char *name = NULL)
    349       :w_(reinterpret_cast<worker_t>(worker)), arg_(arg), name_(name) {}
    350   MyThread(void (*worker)(void *), void *arg = NULL, const char *name = NULL)
    351       :w_(reinterpret_cast<worker_t>(worker)), arg_(arg), name_(name) {}
    352 
    353   ~MyThread(){ w_ = NULL; arg_ = NULL;}
    354   void Start() { CHECK(0 == pthread_create(&t_, NULL, (worker_t)ThreadBody, this));}
    355   void Join()  { CHECK(0 == pthread_join(t_, NULL));}
    356   pthread_t tid() const { return t_; }
    357  private:
    358   static void ThreadBody(MyThread *my_thread) {
    359     if (my_thread->name_) {
    360       ANNOTATE_THREAD_NAME(my_thread->name_);
    361     }
    362     my_thread->w_(my_thread->arg_);
    363   }
    364   pthread_t t_;
    365   worker_t  w_;
    366   void     *arg_;
    367   const char *name_;
    368 };
    369 
    370 
    371 /// Just a message queue.
    372 class ProducerConsumerQueue {
    373  public:
    374   ProducerConsumerQueue(int unused) {
    375     //ANNOTATE_PCQ_CREATE(this);
    376   }
    377   ~ProducerConsumerQueue() {
    378     CHECK(q_.empty());
    379     //ANNOTATE_PCQ_DESTROY(this);
    380   }
    381 
    382   // Put.
    383   void Put(void *item) {
    384     mu_.Lock();
    385       q_.push(item);
    386       ANNOTATE_CONDVAR_SIGNAL(&mu_); // LockWhen in Get()
    387       //ANNOTATE_PCQ_PUT(this);
    388     mu_.Unlock();
    389   }
    390 
    391   // Get.
    392   // Blocks if the queue is empty.
    393   void *Get() {
    394     mu_.LockWhen(Condition(IsQueueNotEmpty, &q_));
    395       void * item = NULL;
    396       bool ok = TryGetInternal(&item);
    397       CHECK(ok);
    398     mu_.Unlock();
    399     return item;
    400   }
    401 
    402   // If queue is not empty,
    403   // remove an element from queue, put it into *res and return true.
    404   // Otherwise return false.
    405   bool TryGet(void **res) {
    406     mu_.Lock();
    407       bool ok = TryGetInternal(res);
    408     mu_.Unlock();
    409     return ok;
    410   }
    411 
    412  private:
    413   Mutex mu_;
    414   std::queue<void*> q_; // protected by mu_
    415 
    416   // Requires mu_
    417   bool TryGetInternal(void ** item_ptr) {
    418     if (q_.empty())
    419       return false;
    420     *item_ptr = q_.front();
    421     q_.pop();
    422     //ANNOTATE_PCQ_GET(this);
    423     return true;
    424   }
    425 
    426   static bool IsQueueNotEmpty(std::queue<void*> * queue) {
    427      return !queue->empty();
    428   }
    429 };
    430 
    431 
    432 
    433 /// Function pointer with zero, one or two parameters.
    434 struct Closure {
    435   typedef void (*F0)();
    436   typedef void (*F1)(void *arg1);
    437   typedef void (*F2)(void *arg1, void *arg2);
    438   int  n_params;
    439   void *f;
    440   void *param1;
    441   void *param2;
    442 
    443   void Execute() {
    444     if (n_params == 0) {
    445       (F0(f))();
    446     } else if (n_params == 1) {
    447       (F1(f))(param1);
    448     } else {
    449       CHECK(n_params == 2);
    450       (F2(f))(param1, param2);
    451     }
    452     delete this;
    453   }
    454 };
    455 
    456 Closure *NewCallback(void (*f)()) {
    457   Closure *res = new Closure;
    458   res->n_params = 0;
    459   res->f = (void*)(f);
    460   res->param1 = NULL;
    461   res->param2 = NULL;
    462   return res;
    463 }
    464 
    465 template <class P1>
    466 Closure *NewCallback(void (*f)(P1), P1 p1) {
    467   CHECK(sizeof(P1) <= sizeof(void*));
    468   Closure *res = new Closure;
    469   res->n_params = 1;
    470   res->f = (void*)(f);
    471   res->param1 = (void*)p1;
    472   res->param2 = NULL;
    473   return res;
    474 }
    475 
    476 template <class T, class P1, class P2>
    477 Closure *NewCallback(void (*f)(P1, P2), P1 p1, P2 p2) {
    478   CHECK(sizeof(P1) <= sizeof(void*));
    479   Closure *res = new Closure;
    480   res->n_params = 2;
    481   res->f = (void*)(f);
    482   res->param1 = (void*)p1;
    483   res->param2 = (void*)p2;
    484   return res;
    485 }
    486 
    487 /*! A thread pool that uses ProducerConsumerQueue.
    488   Usage:
    489   {
    490     ThreadPool pool(n_workers);
    491     pool.StartWorkers();
    492     pool.Add(NewCallback(func_with_no_args));
    493     pool.Add(NewCallback(func_with_one_arg, arg));
    494     pool.Add(NewCallback(func_with_two_args, arg1, arg2));
    495     ... // more calls to pool.Add()
    496 
    497     // the ~ThreadPool() is called: we wait workers to finish
    498     // and then join all threads in the pool.
    499   }
    500 */
    501 class ThreadPool {
    502  public:
    503   //! Create n_threads threads, but do not start.
    504   explicit ThreadPool(int n_threads)
    505     : queue_(INT_MAX) {
    506     for (int i = 0; i < n_threads; i++) {
    507       MyThread *thread = new MyThread(&ThreadPool::Worker, this);
    508       workers_.push_back(thread);
    509     }
    510   }
    511 
    512   //! Start all threads.
    513   void StartWorkers() {
    514     for (size_t i = 0; i < workers_.size(); i++) {
    515       workers_[i]->Start();
    516     }
    517   }
    518 
    519   //! Add a closure.
    520   void Add(Closure *closure) {
    521     queue_.Put(closure);
    522   }
    523 
    524   int num_threads() { return workers_.size();}
    525 
    526   //! Wait workers to finish, then join all threads.
    527   ~ThreadPool() {
    528     for (size_t i = 0; i < workers_.size(); i++) {
    529       Add(NULL);
    530     }
    531     for (size_t i = 0; i < workers_.size(); i++) {
    532       workers_[i]->Join();
    533       delete workers_[i];
    534     }
    535   }
    536  private:
    537   std::vector<MyThread*>   workers_;
    538   ProducerConsumerQueue  queue_;
    539 
    540   static void *Worker(void *p) {
    541     ThreadPool *pool = reinterpret_cast<ThreadPool*>(p);
    542     while (true) {
    543       Closure *closure = reinterpret_cast<Closure*>(pool->queue_.Get());
    544       if(closure == NULL) {
    545         return NULL;
    546       }
    547       closure->Execute();
    548     }
    549   }
    550 };
    551 
    552 #ifndef NO_BARRIER
    553 /// Wrapper for pthread_barrier_t.
    554 class Barrier{
    555  public:
    556   explicit Barrier(int n_threads) {CHECK(0 == pthread_barrier_init(&b_, 0, n_threads));}
    557   ~Barrier()                      {CHECK(0 == pthread_barrier_destroy(&b_));}
    558   void Block() {
    559     // helgrind 3.3.0 does not have an interceptor for barrier.
    560     // but our current local version does.
    561     // ANNOTATE_CONDVAR_SIGNAL(this);
    562     pthread_barrier_wait(&b_);
    563     // ANNOTATE_CONDVAR_WAIT(this, this);
    564   }
    565  private:
    566   pthread_barrier_t b_;
    567 };
    568 
    569 #endif // NO_BARRIER
    570 
    571 class BlockingCounter {
    572  public:
    573   explicit BlockingCounter(int initial_count) :
    574     count_(initial_count) {}
    575   bool DecrementCount() {
    576     MutexLock lock(&mu_);
    577     count_--;
    578     return count_ == 0;
    579   }
    580   void Wait() {
    581     mu_.LockWhen(Condition(&IsZero, &count_));
    582     mu_.Unlock();
    583   }
    584  private:
    585   static bool IsZero(int *arg) { return *arg == 0; }
    586   Mutex mu_;
    587   int count_;
    588 };
    589 
    590 int AtomicIncrement(volatile int *value, int increment);
    591 
    592 #ifndef VGO_darwin
    593 inline int AtomicIncrement(volatile int *value, int increment) {
    594   return __sync_add_and_fetch(value, increment);
    595 }
    596 
    597 #else
    598 // Mac OS X version.
    599 inline int AtomicIncrement(volatile int *value, int increment) {
    600   return OSAtomicAdd32(increment, value);
    601 }
    602 
    603 // TODO(timurrrr) this is a hack
    604 #define memalign(A,B) malloc(B)
    605 
    606 // TODO(timurrrr) this is a hack
    607 int posix_memalign(void **out, size_t al, size_t size) {
    608   *out = memalign(al, size);
    609   return (*out == 0);
    610 }
    611 #endif // VGO_darwin
    612 
    613 #endif // THREAD_WRAPPERS_PTHREAD_H
    614 // vim:shiftwidth=2:softtabstop=2:expandtab:foldmethod=marker
    615