Home | History | Annotate | Download | only in src
      1 // Copyright 2006 Google Inc. All Rights Reserved.
      2 
      3 // Licensed under the Apache License, Version 2.0 (the "License");
      4 // you may not use this file except in compliance with the License.
      5 // You may obtain a copy of the License at
      6 
      7 //      http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 // Unless required by applicable law or agreed to in writing, software
     10 // distributed under the License is distributed on an "AS IS" BASIS,
     11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 // See the License for the specific language governing permissions and
     13 // limitations under the License.
     14 
     15 // worker.h : worker thread interface
     16 
     17 // This file contains the Worker Thread class interface
     18 // for the SAT test. Worker Threads implement a repetative
     19 // task used to test or stress the system.
     20 
     21 #ifndef STRESSAPPTEST_WORKER_H_
     22 #define STRESSAPPTEST_WORKER_H_
     23 
     24 #include <pthread.h>
     25 
     26 #include <sys/time.h>
     27 #include <sys/types.h>
     28 
     29 #ifdef HAVE_LIBAIO_H
     30 #include <libaio.h>
     31 #endif
     32 
     33 #include <queue>
     34 #include <set>
     35 #include <string>
     36 #include <vector>
     37 
     38 // This file must work with autoconf on its public version,
     39 // so these includes are correct.
     40 #include "disk_blocks.h"
     41 #include "queue.h"
     42 #include "sattypes.h"
     43 
     44 
     45 // Global Datastruture shared by the Cache Coherency Worker Threads.
     46 struct cc_cacheline_data {
     47   int *num;
     48 };
     49 
     50 // Typical usage:
     51 // (Other workflows may be possible, see function comments for details.)
     52 // - Control thread creates object.
     53 // - Control thread calls AddWorkers(1) for each worker thread.
     54 // - Control thread calls Initialize().
     55 // - Control thread launches worker threads.
     56 // - Every worker thread frequently calls ContinueRunning().
     57 // - Control thread periodically calls PauseWorkers(), effectively sleeps, and
     58 //     then calls ResumeWorkers().
     59 // - Some worker threads may exit early, before StopWorkers() is called.  They
     60 //     call RemoveSelf() after their last call to ContinueRunning().
     61 // - Control thread eventually calls StopWorkers().
     62 // - Worker threads exit.
     63 // - Control thread joins worker threads.
     64 // - Control thread calls Destroy().
     65 // - Control thread destroys object.
     66 //
     67 // Threadsafety:
     68 // - ContinueRunning() may be called concurrently by different workers, but not
     69 //     by a single worker.
     70 // - No other methods may ever be called concurrently, with themselves or
     71 //     eachother.
     72 // - This object may be used by multiple threads only between Initialize() and
     73 //     Destroy().
     74 //
     75 // TODO(matthewb): Move this class and its unittest to their own files.
     76 class WorkerStatus {
     77  public:
     78   //--------------------------------
     79   // Methods for the control thread.
     80   //--------------------------------
     81 
     82   WorkerStatus() : num_workers_(0), status_(RUN) {}
     83 
     84   // Called by the control thread to increase the worker count.  Must be called
     85   // before Initialize().  The worker count is 0 upon object initialization.
     86   void AddWorkers(int num_new_workers) {
     87     // No need to lock num_workers_mutex_ because this is before Initialize().
     88     num_workers_ += num_new_workers;
     89   }
     90 
     91   // Called by the control thread.  May not be called multiple times.  If
     92   // called, Destroy() must be called before destruction.
     93   void Initialize();
     94 
     95   // Called by the control thread after joining all worker threads.  Must be
     96   // called iff Initialize() was called.  No methods may be called after calling
     97   // this.
     98   void Destroy();
     99 
    100   // Called by the control thread to tell the workers to pause.  Does not return
    101   // until all workers have called ContinueRunning() or RemoveSelf().  May only
    102   // be called between Initialize() and Stop().  Must not be called multiple
    103   // times without ResumeWorkers() having been called inbetween.
    104   void PauseWorkers();
    105 
    106   // Called by the control thread to tell the workers to resume from a pause.
    107   // May only be called between Initialize() and Stop().  May only be called
    108   // directly after PauseWorkers().
    109   void ResumeWorkers();
    110 
    111   // Called by the control thread to tell the workers to stop.  May only be
    112   // called between Initialize() and Destroy().  May only be called once.
    113   void StopWorkers();
    114 
    115   //--------------------------------
    116   // Methods for the worker threads.
    117   //--------------------------------
    118 
    119   // Called by worker threads to decrease the worker count by one.  May only be
    120   // called between Initialize() and Destroy().  May wait for ResumeWorkers()
    121   // when called after PauseWorkers().
    122   void RemoveSelf();
    123 
    124   // Called by worker threads between Initialize() and Destroy().  May be called
    125   // any number of times.  Return value is whether or not the worker should
    126   // continue running.  When called after PauseWorkers(), does not return until
    127   // ResumeWorkers() or StopWorkers() has been called.  Number of distinct
    128   // calling threads must match the worker count (see AddWorkers() and
    129   // RemoveSelf()).
    130   bool ContinueRunning();
    131 
    132   // TODO(matthewb): Is this functionality really necessary?  Remove it if not.
    133   //
    134   // This is a hack!  It's like ContinueRunning(), except it won't pause.  If
    135   // any worker threads use this exclusively in place of ContinueRunning() then
    136   // PauseWorkers() should never be used!
    137   bool ContinueRunningNoPause();
    138 
    139  private:
    140   enum Status { RUN, PAUSE, STOP };
    141 
    142   void WaitOnPauseBarrier() {
    143 #ifdef _POSIX_BARRIERS
    144     int error = pthread_barrier_wait(&pause_barrier_);
    145     if (error != PTHREAD_BARRIER_SERIAL_THREAD)
    146       sat_assert(error == 0);
    147 #endif
    148   }
    149 
    150   void AcquireNumWorkersLock() {
    151     sat_assert(0 == pthread_mutex_lock(&num_workers_mutex_));
    152   }
    153 
    154   void ReleaseNumWorkersLock() {
    155     sat_assert(0 == pthread_mutex_unlock(&num_workers_mutex_));
    156   }
    157 
    158   void AcquireStatusReadLock() {
    159     sat_assert(0 == pthread_rwlock_rdlock(&status_rwlock_));
    160   }
    161 
    162   void AcquireStatusWriteLock() {
    163     sat_assert(0 == pthread_rwlock_wrlock(&status_rwlock_));
    164   }
    165 
    166   void ReleaseStatusLock() {
    167     sat_assert(0 == pthread_rwlock_unlock(&status_rwlock_));
    168   }
    169 
    170   Status GetStatus() {
    171     AcquireStatusReadLock();
    172     Status status = status_;
    173     ReleaseStatusLock();
    174     return status;
    175   }
    176 
    177   // Returns the previous status.
    178   Status SetStatus(Status status) {
    179     AcquireStatusWriteLock();
    180     Status prev_status = status_;
    181     status_ = status;
    182     ReleaseStatusLock();
    183     return prev_status;
    184   }
    185 
    186   pthread_mutex_t num_workers_mutex_;
    187   int num_workers_;
    188 
    189   pthread_rwlock_t status_rwlock_;
    190   Status status_;
    191 
    192 #ifdef _POSIX_BARRIERS
    193   // Guaranteed to not be in use when (status_ != PAUSE).
    194   pthread_barrier_t pause_barrier_;
    195 #endif
    196 
    197   DISALLOW_COPY_AND_ASSIGN(WorkerStatus);
    198 };
    199 
    200 
    201 // This is a base class for worker threads.
    202 // Each thread repeats a specific
    203 // task on various blocks of memory.
    204 class WorkerThread {
    205  public:
    206   // Enum to mark a thread as low/med/high priority.
    207   enum Priority {
    208     Low,
    209     Normal,
    210     High,
    211   };
    212   WorkerThread();
    213   virtual ~WorkerThread();
    214 
    215   // Initialize values and thread ID number.
    216   virtual void InitThread(int thread_num_init,
    217                           class Sat *sat_init,
    218                           class OsLayer *os_init,
    219                           class PatternList *patternlist_init,
    220                           WorkerStatus *worker_status);
    221 
    222   // This function is DEPRECATED, it does nothing.
    223   void SetPriority(Priority priority) { priority_ = priority; }
    224   // Spawn the worker thread, by running Work().
    225   int SpawnThread();
    226   // Only for ThreadSpawnerGeneric().
    227   void StartRoutine();
    228   bool InitPriority();
    229 
    230   // Wait for the thread to complete its cleanup.
    231   virtual bool JoinThread();
    232   // Kill worker thread with SIGINT.
    233   virtual bool KillThread();
    234 
    235   // This is the task function that the thread executes.
    236   // This is implemented per subclass.
    237   virtual bool Work();
    238 
    239   // Starts per-WorkerThread timer.
    240   void StartThreadTimer() {gettimeofday(&start_time_, NULL);}
    241   // Reads current timer value and returns run duration without recording it.
    242   int64 ReadThreadTimer() {
    243     struct timeval end_time_;
    244     gettimeofday(&end_time_, NULL);
    245     return (end_time_.tv_sec - start_time_.tv_sec)*1000000 +
    246       (end_time_.tv_usec - start_time_.tv_usec);
    247   }
    248   // Stops per-WorkerThread timer and records thread run duration.
    249   // Start/Stop ThreadTimer repetitively has cumulative effect, ie the timer
    250   // is effectively paused and restarted, so runduration_usec accumulates on.
    251   void StopThreadTimer() {
    252     runduration_usec_ += ReadThreadTimer();
    253   }
    254 
    255   // Acccess member variables.
    256   bool GetStatus() {return status_;}
    257   int64 GetErrorCount() {return errorcount_;}
    258   int64 GetPageCount() {return pages_copied_;}
    259   int64 GetRunDurationUSec() {return runduration_usec_;}
    260 
    261   // Returns bandwidth defined as pages_copied / thread_run_durations.
    262   virtual float GetCopiedData();
    263   // Calculate worker thread specific copied data.
    264   virtual float GetMemoryCopiedData() {return 0;}
    265   virtual float GetDeviceCopiedData() {return 0;}
    266   // Calculate worker thread specific bandwidth.
    267   virtual float GetMemoryBandwidth()
    268     {return GetMemoryCopiedData() / (
    269         runduration_usec_ * 1.0 / 1000000);}
    270   virtual float GetDeviceBandwidth()
    271     {return GetDeviceCopiedData() / (
    272         runduration_usec_ * 1.0 / 1000000);}
    273 
    274   void set_cpu_mask(cpu_set_t *mask) {
    275     memcpy(&cpu_mask_, mask, sizeof(*mask));
    276   }
    277 
    278   void set_cpu_mask_to_cpu(int cpu_num) {
    279     cpuset_set_ab(&cpu_mask_, cpu_num, cpu_num + 1);
    280   }
    281 
    282   void set_tag(int32 tag) {tag_ = tag;}
    283 
    284   // Returns CPU mask, where each bit represents a logical cpu.
    285   bool AvailableCpus(cpu_set_t *cpuset);
    286   // Returns CPU mask of CPUs this thread is bound to,
    287   bool CurrentCpus(cpu_set_t *cpuset);
    288   // Returns Current Cpus mask as string.
    289   string CurrentCpusFormat() {
    290     cpu_set_t current_cpus;
    291     CurrentCpus(&current_cpus);
    292     return cpuset_format(&current_cpus);
    293   }
    294 
    295   int ThreadID() {return thread_num_;}
    296 
    297   // Bind worker thread to specified CPU(s)
    298   bool BindToCpus(const cpu_set_t *cpuset);
    299 
    300  protected:
    301   // This function dictates whether the main work loop
    302   // continues, waits, or terminates.
    303   // All work loops should be of the form:
    304   //   do {
    305   //     // work.
    306   //   } while (IsReadyToRun());
    307   virtual bool IsReadyToRun() { return worker_status_->ContinueRunning(); }
    308   // TODO(matthewb): Is this function really necessary? Remove it if not.
    309   //
    310   // Like IsReadyToRun(), except it won't pause.
    311   virtual bool IsReadyToRunNoPause() {
    312     return worker_status_->ContinueRunningNoPause();
    313   }
    314 
    315   // These are functions used by the various work loops.
    316   // Pretty print and log a data miscompare.
    317   virtual void ProcessError(struct ErrorRecord *er,
    318                             int priority,
    319                             const char *message);
    320 
    321   // Compare a region of memory with a known data patter, and report errors.
    322   virtual int CheckRegion(void *addr,
    323                           class Pattern *pat,
    324                           int64 length,
    325                           int offset,
    326                           int64 patternoffset);
    327 
    328   // Fast compare a block of memory.
    329   virtual int CrcCheckPage(struct page_entry *srcpe);
    330 
    331   // Fast copy a block of memory, while verifying correctness.
    332   virtual int CrcCopyPage(struct page_entry *dstpe,
    333                           struct page_entry *srcpe);
    334 
    335   // Fast copy a block of memory, while verifying correctness, and heating CPU.
    336   virtual int CrcWarmCopyPage(struct page_entry *dstpe,
    337                               struct page_entry *srcpe);
    338 
    339   // Fill a page with its specified pattern.
    340   virtual bool FillPage(struct page_entry *pe);
    341 
    342   // Copy with address tagging.
    343   virtual bool AdlerAddrMemcpyC(uint64 *dstmem64,
    344                                 uint64 *srcmem64,
    345                                 unsigned int size_in_bytes,
    346                                 AdlerChecksum *checksum,
    347                                 struct page_entry *pe);
    348   // SSE copy with address tagging.
    349   virtual bool AdlerAddrMemcpyWarm(uint64 *dstmem64,
    350                                    uint64 *srcmem64,
    351                                    unsigned int size_in_bytes,
    352                                    AdlerChecksum *checksum,
    353                                    struct page_entry *pe);
    354   // Crc data with address tagging.
    355   virtual bool AdlerAddrCrcC(uint64 *srcmem64,
    356                              unsigned int size_in_bytes,
    357                              AdlerChecksum *checksum,
    358                              struct page_entry *pe);
    359   // Setup tagging on an existing page.
    360   virtual bool TagAddrC(uint64 *memwords,
    361                         unsigned int size_in_bytes);
    362   // Report a mistagged cacheline.
    363   virtual bool ReportTagError(uint64 *mem64,
    364                       uint64 actual,
    365                       uint64 tag);
    366   // Print out the error record of the tag mismatch.
    367   virtual void ProcessTagError(struct ErrorRecord *error,
    368                        int priority,
    369                        const char *message);
    370 
    371   // A worker thread can yield itself to give up CPU until it's scheduled again
    372   bool YieldSelf();
    373 
    374  protected:
    375   // General state variables that all subclasses need.
    376   int thread_num_;                  // Thread ID.
    377   volatile bool status_;            // Error status.
    378   volatile int64 pages_copied_;     // Recorded for memory bandwidth calc.
    379   volatile int64 errorcount_;       // Miscompares seen by this thread.
    380 
    381   cpu_set_t cpu_mask_;              // Cores this thread is allowed to run on.
    382   volatile uint32 tag_;             // Tag hint for memory this thread can use.
    383 
    384   bool tag_mode_;                   // Tag cachelines with vaddr.
    385 
    386   // Thread timing variables.
    387   struct timeval start_time_;        // Worker thread start time.
    388   volatile int64 runduration_usec_;  // Worker run duration in u-seconds.
    389 
    390   // Function passed to pthread_create.
    391   void *(*thread_spawner_)(void *args);
    392   pthread_t thread_;                // Pthread thread ID.
    393   Priority priority_;               // Worker thread priority.
    394   class Sat *sat_;                  // Reference to parent stest object.
    395   class OsLayer *os_;               // Os abstraction: put hacks here.
    396   class PatternList *patternlist_;  // Reference to data patterns.
    397 
    398   // Work around style guide ban on sizeof(int).
    399   static const uint64 iamint_ = 0;
    400   static const int wordsize_ = sizeof(iamint_);
    401 
    402  private:
    403   WorkerStatus *worker_status_;
    404 
    405   DISALLOW_COPY_AND_ASSIGN(WorkerThread);
    406 };
    407 
    408 // Worker thread to perform File IO.
    409 class FileThread : public WorkerThread {
    410  public:
    411   FileThread();
    412   // Set filename to use for file IO.
    413   virtual void SetFile(const char *filename_init);
    414   virtual bool Work();
    415 
    416   // Calculate worker thread specific bandwidth.
    417   virtual float GetDeviceCopiedData()
    418     {return GetCopiedData()*2;}
    419   virtual float GetMemoryCopiedData();
    420 
    421  protected:
    422   // Record of where these pages were sourced from, and what
    423   // potentially broken components they passed through.
    424   struct PageRec {
    425      struct Pattern *pattern;  // This is the data it should contain.
    426      void *src;  // This is the memory location the data was sourced from.
    427      void *dst;  // This is where it ended up.
    428   };
    429 
    430   // These are functions used by the various work loops.
    431   // Pretty print and log a data miscompare. Disks require
    432   // slightly different error handling.
    433   virtual void ProcessError(struct ErrorRecord *er,
    434                             int priority,
    435                             const char *message);
    436 
    437   virtual bool OpenFile(int *pfile);
    438   virtual bool CloseFile(int fd);
    439 
    440   // Read and write whole file to disk.
    441   virtual bool WritePages(int fd);
    442   virtual bool ReadPages(int fd);
    443 
    444   // Read and write pages to disk.
    445   virtual bool WritePageToFile(int fd, struct page_entry *src);
    446   virtual bool ReadPageFromFile(int fd, struct page_entry *dst);
    447 
    448   // Sector tagging support.
    449   virtual bool SectorTagPage(struct page_entry *src, int block);
    450   virtual bool SectorValidatePage(const struct PageRec &page,
    451                                   struct page_entry *dst,
    452                                   int block);
    453 
    454   // Get memory for an incoming data transfer..
    455   virtual bool PagePrepare();
    456   // Remove memory allocated for data transfer.
    457   virtual bool PageTeardown();
    458 
    459   // Get memory for an incoming data transfer..
    460   virtual bool GetEmptyPage(struct page_entry *dst);
    461   // Get memory for an outgoing data transfer..
    462   virtual bool GetValidPage(struct page_entry *dst);
    463   // Throw out a used empty page.
    464   virtual bool PutEmptyPage(struct page_entry *src);
    465   // Throw out a used, filled page.
    466   virtual bool PutValidPage(struct page_entry *src);
    467 
    468 
    469   struct PageRec *page_recs_;          // Array of page records.
    470   int crc_page_;                        // Page currently being CRC checked.
    471   string filename_;                     // Name of file to access.
    472   string devicename_;                   // Name of device file is on.
    473 
    474   bool page_io_;                        // Use page pool for IO.
    475   void *local_page_;                   // malloc'd page fon non-pool IO.
    476   int pass_;                            // Number of writes to the file so far.
    477 
    478   // Tag to detect file corruption.
    479   struct SectorTag {
    480     volatile uint8 magic;
    481     volatile uint8 block;
    482     volatile uint8 sector;
    483     volatile uint8 pass;
    484     char pad[512-4];
    485   };
    486 
    487   DISALLOW_COPY_AND_ASSIGN(FileThread);
    488 };
    489 
    490 
    491 // Worker thread to perform Network IO.
    492 class NetworkThread : public WorkerThread {
    493  public:
    494   NetworkThread();
    495   // Set hostname to use for net IO.
    496   virtual void SetIP(const char *ipaddr_init);
    497   virtual bool Work();
    498 
    499   // Calculate worker thread specific bandwidth.
    500   virtual float GetDeviceCopiedData()
    501     {return GetCopiedData()*2;}
    502 
    503  protected:
    504   // IsReadyToRunNoPause() wrapper, for NetworkSlaveThread to override.
    505   virtual bool IsNetworkStopSet();
    506   virtual bool CreateSocket(int *psocket);
    507   virtual bool CloseSocket(int sock);
    508   virtual bool Connect(int sock);
    509   virtual bool SendPage(int sock, struct page_entry *src);
    510   virtual bool ReceivePage(int sock, struct page_entry *dst);
    511   char ipaddr_[256];
    512   int sock_;
    513 
    514  private:
    515   DISALLOW_COPY_AND_ASSIGN(NetworkThread);
    516 };
    517 
    518 // Worker thread to reflect Network IO.
    519 class NetworkSlaveThread : public NetworkThread {
    520  public:
    521   NetworkSlaveThread();
    522   // Set socket for IO.
    523   virtual void SetSock(int sock);
    524   virtual bool Work();
    525 
    526  protected:
    527   virtual bool IsNetworkStopSet();
    528 
    529  private:
    530   DISALLOW_COPY_AND_ASSIGN(NetworkSlaveThread);
    531 };
    532 
    533 // Worker thread to detect incoming Network IO.
    534 class NetworkListenThread : public NetworkThread {
    535  public:
    536   NetworkListenThread();
    537   virtual bool Work();
    538 
    539  private:
    540   virtual bool Listen();
    541   virtual bool Wait();
    542   virtual bool GetConnection(int *pnewsock);
    543   virtual bool SpawnSlave(int newsock, int threadid);
    544   virtual bool ReapSlaves();
    545 
    546   // For serviced incoming connections.
    547   struct ChildWorker {
    548     WorkerStatus status;
    549     NetworkSlaveThread thread;
    550   };
    551   typedef vector<ChildWorker*> ChildVector;
    552   ChildVector child_workers_;
    553 
    554   DISALLOW_COPY_AND_ASSIGN(NetworkListenThread);
    555 };
    556 
    557 // Worker thread to perform Memory Copy.
    558 class CopyThread : public WorkerThread {
    559  public:
    560   CopyThread() {}
    561   virtual bool Work();
    562   // Calculate worker thread specific bandwidth.
    563   virtual float GetMemoryCopiedData()
    564     {return GetCopiedData()*2;}
    565 
    566  private:
    567   DISALLOW_COPY_AND_ASSIGN(CopyThread);
    568 };
    569 
    570 // Worker thread to perform Memory Invert.
    571 class InvertThread : public WorkerThread {
    572  public:
    573   InvertThread() {}
    574   virtual bool Work();
    575   // Calculate worker thread specific bandwidth.
    576   virtual float GetMemoryCopiedData()
    577     {return GetCopiedData()*4;}
    578 
    579  private:
    580   virtual int InvertPageUp(struct page_entry *srcpe);
    581   virtual int InvertPageDown(struct page_entry *srcpe);
    582   DISALLOW_COPY_AND_ASSIGN(InvertThread);
    583 };
    584 
    585 // Worker thread to fill blank pages on startup.
    586 class FillThread : public WorkerThread {
    587  public:
    588   FillThread();
    589   // Set how many pages this thread should fill before exiting.
    590   virtual void SetFillPages(int64 num_pages_to_fill_init);
    591   virtual bool Work();
    592 
    593  private:
    594   // Fill a page with the data pattern in pe->pattern.
    595   virtual bool FillPageRandom(struct page_entry *pe);
    596   int64 num_pages_to_fill_;
    597   DISALLOW_COPY_AND_ASSIGN(FillThread);
    598 };
    599 
    600 // Worker thread to verify page data matches pattern data.
    601 // Thread will check and replace pages until "done" flag is set,
    602 // then it will check and discard pages until no more remain.
    603 class CheckThread : public WorkerThread {
    604  public:
    605   CheckThread() {}
    606   virtual bool Work();
    607   // Calculate worker thread specific bandwidth.
    608   virtual float GetMemoryCopiedData()
    609     {return GetCopiedData();}
    610 
    611  private:
    612   DISALLOW_COPY_AND_ASSIGN(CheckThread);
    613 };
    614 
    615 
    616 // Worker thread to poll for system error messages.
    617 // Thread will check for messages until "done" flag is set.
    618 class ErrorPollThread : public WorkerThread {
    619  public:
    620   ErrorPollThread() {}
    621   virtual bool Work();
    622 
    623  private:
    624   DISALLOW_COPY_AND_ASSIGN(ErrorPollThread);
    625 };
    626 
    627 // Computation intensive worker thread to stress CPU.
    628 class CpuStressThread : public WorkerThread {
    629  public:
    630   CpuStressThread() {}
    631   virtual bool Work();
    632 
    633  private:
    634   DISALLOW_COPY_AND_ASSIGN(CpuStressThread);
    635 };
    636 
    637 // Worker thread that tests the correctness of the
    638 // CPU Cache Coherency Protocol.
    639 class CpuCacheCoherencyThread : public WorkerThread {
    640  public:
    641   CpuCacheCoherencyThread(cc_cacheline_data *cc_data,
    642                           int cc_cacheline_count_,
    643                           int cc_thread_num_,
    644                           int cc_inc_count_);
    645   virtual bool Work();
    646 
    647  protected:
    648   cc_cacheline_data *cc_cacheline_data_;  // Datstructure for each cacheline.
    649   int cc_local_num_;        // Local counter for each thread.
    650   int cc_cacheline_count_;  // Number of cache lines to operate on.
    651   int cc_thread_num_;       // The integer id of the thread which is
    652                             // used as an index into the integer array
    653                             // of the cacheline datastructure.
    654   int cc_inc_count_;        // Number of times to increment the counter.
    655 
    656  private:
    657   DISALLOW_COPY_AND_ASSIGN(CpuCacheCoherencyThread);
    658 };
    659 
    660 // Worker thread to perform disk test.
    661 class DiskThread : public WorkerThread {
    662  public:
    663   explicit DiskThread(DiskBlockTable *block_table);
    664   virtual ~DiskThread();
    665   // Calculate disk thread specific bandwidth.
    666   virtual float GetDeviceCopiedData() {
    667     return (blocks_written_ * write_block_size_ +
    668             blocks_read_ * read_block_size_) / kMegabyte;}
    669 
    670   // Set filename for device file (in /dev).
    671   virtual void SetDevice(const char *device_name);
    672   // Set various parameters that control the behaviour of the test.
    673   virtual bool SetParameters(int read_block_size,
    674                              int write_block_size,
    675                              int64 segment_size,
    676                              int64 cache_size,
    677                              int blocks_per_segment,
    678                              int64 read_threshold,
    679                              int64 write_threshold,
    680                              int non_destructive);
    681 
    682   virtual bool Work();
    683 
    684   virtual float GetMemoryCopiedData() {return 0;}
    685 
    686  protected:
    687   static const int kSectorSize = 512;       // Size of sector on disk.
    688   static const int kBufferAlignment = 512;  // Buffer alignment required by the
    689                                             // kernel.
    690   static const int kBlockRetry = 100;       // Number of retries to allocate
    691                                             // sectors.
    692 
    693   enum IoOp {
    694     ASYNC_IO_READ   = 0,
    695     ASYNC_IO_WRITE  = 1
    696   };
    697 
    698   virtual bool OpenDevice(int *pfile);
    699   virtual bool CloseDevice(int fd);
    700 
    701   // Retrieves the size (in bytes) of the disk/file.
    702   virtual bool GetDiskSize(int fd);
    703 
    704   // Retrieves the current time in microseconds.
    705   virtual int64 GetTime();
    706 
    707   // Do an asynchronous disk I/O operation.
    708   virtual bool AsyncDiskIO(IoOp op, int fd, void *buf, int64 size,
    709                            int64 offset, int64 timeout);
    710 
    711   // Write a block to disk.
    712   virtual bool WriteBlockToDisk(int fd, BlockData *block);
    713 
    714   // Verify a block on disk.
    715   virtual bool ValidateBlockOnDisk(int fd, BlockData *block);
    716 
    717   // Main work loop.
    718   virtual bool DoWork(int fd);
    719 
    720   int read_block_size_;       // Size of blocks read from disk, in bytes.
    721   int write_block_size_;      // Size of blocks written to disk, in bytes.
    722   int64 blocks_read_;         // Number of blocks read in work loop.
    723   int64 blocks_written_;      // Number of blocks written in work loop.
    724   int64 segment_size_;        // Size of disk segments (in bytes) that the disk
    725                               // will be split into where testing can be
    726                               // confined to a particular segment.
    727                               // Allows for control of how evenly the disk will
    728                               // be tested.  Smaller segments imply more even
    729                               // testing (less random).
    730   int blocks_per_segment_;    // Number of blocks that will be tested per
    731                               // segment.
    732   int cache_size_;            // Size of disk cache, in bytes.
    733   int queue_size_;            // Length of in-flight-blocks queue, in blocks.
    734   int non_destructive_;       // Use non-destructive mode or not.
    735   int update_block_table_;    // If true, assume this is the thread
    736                               // responsible for writing the data in the disk
    737                               // for this block device and, therefore,
    738                               // update the block table. If false, just use
    739                               // the block table to get data.
    740 
    741   // read/write times threshold for reporting a problem
    742   int64 read_threshold_;      // Maximum time a read should take (in us) before
    743                               // a warning is given.
    744   int64 write_threshold_;     // Maximum time a write should take (in us) before
    745                               // a warning is given.
    746   int64 read_timeout_;        // Maximum time a read can take before a timeout
    747                               // and the aborting of the read operation.
    748   int64 write_timeout_;       // Maximum time a write can take before a timeout
    749                               // and the aborting of the write operation.
    750 
    751   string device_name_;        // Name of device file to access.
    752   int64 device_sectors_;      // Number of sectors on the device.
    753 
    754   std::queue<BlockData*> in_flight_sectors_;   // Queue of sectors written but
    755                                                 // not verified.
    756   void *block_buffer_;        // Pointer to aligned block buffer.
    757 
    758 #ifdef HAVE_LIBAIO_H
    759   io_context_t aio_ctx_;     // Asynchronous I/O context for Linux native AIO.
    760 #endif
    761 
    762   DiskBlockTable *block_table_;  // Disk Block Table, shared by all disk
    763                                  // threads that read / write at the same
    764                                  // device
    765 
    766   DISALLOW_COPY_AND_ASSIGN(DiskThread);
    767 };
    768 
    769 class RandomDiskThread : public DiskThread {
    770  public:
    771   explicit RandomDiskThread(DiskBlockTable *block_table);
    772   virtual ~RandomDiskThread();
    773   // Main work loop.
    774   virtual bool DoWork(int fd);
    775  protected:
    776   DISALLOW_COPY_AND_ASSIGN(RandomDiskThread);
    777 };
    778 
    779 // Worker thread to perform checks in a specific memory region.
    780 class MemoryRegionThread : public WorkerThread {
    781  public:
    782   MemoryRegionThread();
    783   ~MemoryRegionThread();
    784   virtual bool Work();
    785   void ProcessError(struct ErrorRecord *error, int priority,
    786                     const char *message);
    787   bool SetRegion(void *region, int64 size);
    788   // Calculate worker thread specific bandwidth.
    789   virtual float GetMemoryCopiedData()
    790     {return GetCopiedData();}
    791   virtual float GetDeviceCopiedData()
    792     {return GetCopiedData() * 2;}
    793   void SetIdentifier(string identifier) {
    794     identifier_ = identifier;
    795   }
    796 
    797  protected:
    798   // Page queue for this particular memory region.
    799   char *region_;
    800   PageEntryQueue *pages_;
    801   bool error_injection_;
    802   int phase_;
    803   string identifier_;
    804   static const int kPhaseNoPhase = 0;
    805   static const int kPhaseCopy = 1;
    806   static const int kPhaseCheck = 2;
    807 
    808  private:
    809   DISALLOW_COPY_AND_ASSIGN(MemoryRegionThread);
    810 };
    811 
    812 #endif  // STRESSAPPTEST_WORKER_H_
    813