1 // Copyright 2006 Google Inc. All Rights Reserved. 2 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 7 // http://www.apache.org/licenses/LICENSE-2.0 8 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 // worker.h : worker thread interface 16 17 // This file contains the Worker Thread class interface 18 // for the SAT test. Worker Threads implement a repetative 19 // task used to test or stress the system. 20 21 #ifndef STRESSAPPTEST_WORKER_H_ 22 #define STRESSAPPTEST_WORKER_H_ 23 24 #include <pthread.h> 25 26 #include <sys/time.h> 27 #include <sys/types.h> 28 29 #ifdef HAVE_LIBAIO_H 30 #include <libaio.h> 31 #endif 32 33 #include <queue> 34 #include <set> 35 #include <string> 36 #include <vector> 37 38 // This file must work with autoconf on its public version, 39 // so these includes are correct. 40 #include "disk_blocks.h" 41 #include "queue.h" 42 #include "sattypes.h" 43 44 45 // Global Datastruture shared by the Cache Coherency Worker Threads. 46 struct cc_cacheline_data { 47 int *num; 48 }; 49 50 // Typical usage: 51 // (Other workflows may be possible, see function comments for details.) 52 // - Control thread creates object. 53 // - Control thread calls AddWorkers(1) for each worker thread. 54 // - Control thread calls Initialize(). 55 // - Control thread launches worker threads. 56 // - Every worker thread frequently calls ContinueRunning(). 57 // - Control thread periodically calls PauseWorkers(), effectively sleeps, and 58 // then calls ResumeWorkers(). 59 // - Some worker threads may exit early, before StopWorkers() is called. They 60 // call RemoveSelf() after their last call to ContinueRunning(). 61 // - Control thread eventually calls StopWorkers(). 62 // - Worker threads exit. 63 // - Control thread joins worker threads. 64 // - Control thread calls Destroy(). 65 // - Control thread destroys object. 66 // 67 // Threadsafety: 68 // - ContinueRunning() may be called concurrently by different workers, but not 69 // by a single worker. 70 // - No other methods may ever be called concurrently, with themselves or 71 // eachother. 72 // - This object may be used by multiple threads only between Initialize() and 73 // Destroy(). 74 // 75 // TODO(matthewb): Move this class and its unittest to their own files. 76 class WorkerStatus { 77 public: 78 //-------------------------------- 79 // Methods for the control thread. 80 //-------------------------------- 81 82 WorkerStatus() : num_workers_(0), status_(RUN) {} 83 84 // Called by the control thread to increase the worker count. Must be called 85 // before Initialize(). The worker count is 0 upon object initialization. 86 void AddWorkers(int num_new_workers) { 87 // No need to lock num_workers_mutex_ because this is before Initialize(). 88 num_workers_ += num_new_workers; 89 } 90 91 // Called by the control thread. May not be called multiple times. If 92 // called, Destroy() must be called before destruction. 93 void Initialize(); 94 95 // Called by the control thread after joining all worker threads. Must be 96 // called iff Initialize() was called. No methods may be called after calling 97 // this. 98 void Destroy(); 99 100 // Called by the control thread to tell the workers to pause. Does not return 101 // until all workers have called ContinueRunning() or RemoveSelf(). May only 102 // be called between Initialize() and Stop(). Must not be called multiple 103 // times without ResumeWorkers() having been called inbetween. 104 void PauseWorkers(); 105 106 // Called by the control thread to tell the workers to resume from a pause. 107 // May only be called between Initialize() and Stop(). May only be called 108 // directly after PauseWorkers(). 109 void ResumeWorkers(); 110 111 // Called by the control thread to tell the workers to stop. May only be 112 // called between Initialize() and Destroy(). May only be called once. 113 void StopWorkers(); 114 115 //-------------------------------- 116 // Methods for the worker threads. 117 //-------------------------------- 118 119 // Called by worker threads to decrease the worker count by one. May only be 120 // called between Initialize() and Destroy(). May wait for ResumeWorkers() 121 // when called after PauseWorkers(). 122 void RemoveSelf(); 123 124 // Called by worker threads between Initialize() and Destroy(). May be called 125 // any number of times. Return value is whether or not the worker should 126 // continue running. When called after PauseWorkers(), does not return until 127 // ResumeWorkers() or StopWorkers() has been called. Number of distinct 128 // calling threads must match the worker count (see AddWorkers() and 129 // RemoveSelf()). 130 bool ContinueRunning(); 131 132 // TODO(matthewb): Is this functionality really necessary? Remove it if not. 133 // 134 // This is a hack! It's like ContinueRunning(), except it won't pause. If 135 // any worker threads use this exclusively in place of ContinueRunning() then 136 // PauseWorkers() should never be used! 137 bool ContinueRunningNoPause(); 138 139 private: 140 enum Status { RUN, PAUSE, STOP }; 141 142 void WaitOnPauseBarrier() { 143 #ifdef _POSIX_BARRIERS 144 int error = pthread_barrier_wait(&pause_barrier_); 145 if (error != PTHREAD_BARRIER_SERIAL_THREAD) 146 sat_assert(error == 0); 147 #endif 148 } 149 150 void AcquireNumWorkersLock() { 151 sat_assert(0 == pthread_mutex_lock(&num_workers_mutex_)); 152 } 153 154 void ReleaseNumWorkersLock() { 155 sat_assert(0 == pthread_mutex_unlock(&num_workers_mutex_)); 156 } 157 158 void AcquireStatusReadLock() { 159 sat_assert(0 == pthread_rwlock_rdlock(&status_rwlock_)); 160 } 161 162 void AcquireStatusWriteLock() { 163 sat_assert(0 == pthread_rwlock_wrlock(&status_rwlock_)); 164 } 165 166 void ReleaseStatusLock() { 167 sat_assert(0 == pthread_rwlock_unlock(&status_rwlock_)); 168 } 169 170 Status GetStatus() { 171 AcquireStatusReadLock(); 172 Status status = status_; 173 ReleaseStatusLock(); 174 return status; 175 } 176 177 // Returns the previous status. 178 Status SetStatus(Status status) { 179 AcquireStatusWriteLock(); 180 Status prev_status = status_; 181 status_ = status; 182 ReleaseStatusLock(); 183 return prev_status; 184 } 185 186 pthread_mutex_t num_workers_mutex_; 187 int num_workers_; 188 189 pthread_rwlock_t status_rwlock_; 190 Status status_; 191 192 #ifdef _POSIX_BARRIERS 193 // Guaranteed to not be in use when (status_ != PAUSE). 194 pthread_barrier_t pause_barrier_; 195 #endif 196 197 DISALLOW_COPY_AND_ASSIGN(WorkerStatus); 198 }; 199 200 201 // This is a base class for worker threads. 202 // Each thread repeats a specific 203 // task on various blocks of memory. 204 class WorkerThread { 205 public: 206 // Enum to mark a thread as low/med/high priority. 207 enum Priority { 208 Low, 209 Normal, 210 High, 211 }; 212 WorkerThread(); 213 virtual ~WorkerThread(); 214 215 // Initialize values and thread ID number. 216 virtual void InitThread(int thread_num_init, 217 class Sat *sat_init, 218 class OsLayer *os_init, 219 class PatternList *patternlist_init, 220 WorkerStatus *worker_status); 221 222 // This function is DEPRECATED, it does nothing. 223 void SetPriority(Priority priority) { priority_ = priority; } 224 // Spawn the worker thread, by running Work(). 225 int SpawnThread(); 226 // Only for ThreadSpawnerGeneric(). 227 void StartRoutine(); 228 bool InitPriority(); 229 230 // Wait for the thread to complete its cleanup. 231 virtual bool JoinThread(); 232 // Kill worker thread with SIGINT. 233 virtual bool KillThread(); 234 235 // This is the task function that the thread executes. 236 // This is implemented per subclass. 237 virtual bool Work(); 238 239 // Starts per-WorkerThread timer. 240 void StartThreadTimer() {gettimeofday(&start_time_, NULL);} 241 // Reads current timer value and returns run duration without recording it. 242 int64 ReadThreadTimer() { 243 struct timeval end_time_; 244 gettimeofday(&end_time_, NULL); 245 return (end_time_.tv_sec - start_time_.tv_sec)*1000000 + 246 (end_time_.tv_usec - start_time_.tv_usec); 247 } 248 // Stops per-WorkerThread timer and records thread run duration. 249 // Start/Stop ThreadTimer repetitively has cumulative effect, ie the timer 250 // is effectively paused and restarted, so runduration_usec accumulates on. 251 void StopThreadTimer() { 252 runduration_usec_ += ReadThreadTimer(); 253 } 254 255 // Acccess member variables. 256 bool GetStatus() {return status_;} 257 int64 GetErrorCount() {return errorcount_;} 258 int64 GetPageCount() {return pages_copied_;} 259 int64 GetRunDurationUSec() {return runduration_usec_;} 260 261 // Returns bandwidth defined as pages_copied / thread_run_durations. 262 virtual float GetCopiedData(); 263 // Calculate worker thread specific copied data. 264 virtual float GetMemoryCopiedData() {return 0;} 265 virtual float GetDeviceCopiedData() {return 0;} 266 // Calculate worker thread specific bandwidth. 267 virtual float GetMemoryBandwidth() 268 {return GetMemoryCopiedData() / ( 269 runduration_usec_ * 1.0 / 1000000);} 270 virtual float GetDeviceBandwidth() 271 {return GetDeviceCopiedData() / ( 272 runduration_usec_ * 1.0 / 1000000);} 273 274 void set_cpu_mask(cpu_set_t *mask) { 275 memcpy(&cpu_mask_, mask, sizeof(*mask)); 276 } 277 278 void set_cpu_mask_to_cpu(int cpu_num) { 279 cpuset_set_ab(&cpu_mask_, cpu_num, cpu_num + 1); 280 } 281 282 void set_tag(int32 tag) {tag_ = tag;} 283 284 // Returns CPU mask, where each bit represents a logical cpu. 285 bool AvailableCpus(cpu_set_t *cpuset); 286 // Returns CPU mask of CPUs this thread is bound to, 287 bool CurrentCpus(cpu_set_t *cpuset); 288 // Returns Current Cpus mask as string. 289 string CurrentCpusFormat() { 290 cpu_set_t current_cpus; 291 CurrentCpus(¤t_cpus); 292 return cpuset_format(¤t_cpus); 293 } 294 295 int ThreadID() {return thread_num_;} 296 297 // Bind worker thread to specified CPU(s) 298 bool BindToCpus(const cpu_set_t *cpuset); 299 300 protected: 301 // This function dictates whether the main work loop 302 // continues, waits, or terminates. 303 // All work loops should be of the form: 304 // do { 305 // // work. 306 // } while (IsReadyToRun()); 307 virtual bool IsReadyToRun() { return worker_status_->ContinueRunning(); } 308 // TODO(matthewb): Is this function really necessary? Remove it if not. 309 // 310 // Like IsReadyToRun(), except it won't pause. 311 virtual bool IsReadyToRunNoPause() { 312 return worker_status_->ContinueRunningNoPause(); 313 } 314 315 // These are functions used by the various work loops. 316 // Pretty print and log a data miscompare. 317 virtual void ProcessError(struct ErrorRecord *er, 318 int priority, 319 const char *message); 320 321 // Compare a region of memory with a known data patter, and report errors. 322 virtual int CheckRegion(void *addr, 323 class Pattern *pat, 324 int64 length, 325 int offset, 326 int64 patternoffset); 327 328 // Fast compare a block of memory. 329 virtual int CrcCheckPage(struct page_entry *srcpe); 330 331 // Fast copy a block of memory, while verifying correctness. 332 virtual int CrcCopyPage(struct page_entry *dstpe, 333 struct page_entry *srcpe); 334 335 // Fast copy a block of memory, while verifying correctness, and heating CPU. 336 virtual int CrcWarmCopyPage(struct page_entry *dstpe, 337 struct page_entry *srcpe); 338 339 // Fill a page with its specified pattern. 340 virtual bool FillPage(struct page_entry *pe); 341 342 // Copy with address tagging. 343 virtual bool AdlerAddrMemcpyC(uint64 *dstmem64, 344 uint64 *srcmem64, 345 unsigned int size_in_bytes, 346 AdlerChecksum *checksum, 347 struct page_entry *pe); 348 // SSE copy with address tagging. 349 virtual bool AdlerAddrMemcpyWarm(uint64 *dstmem64, 350 uint64 *srcmem64, 351 unsigned int size_in_bytes, 352 AdlerChecksum *checksum, 353 struct page_entry *pe); 354 // Crc data with address tagging. 355 virtual bool AdlerAddrCrcC(uint64 *srcmem64, 356 unsigned int size_in_bytes, 357 AdlerChecksum *checksum, 358 struct page_entry *pe); 359 // Setup tagging on an existing page. 360 virtual bool TagAddrC(uint64 *memwords, 361 unsigned int size_in_bytes); 362 // Report a mistagged cacheline. 363 virtual bool ReportTagError(uint64 *mem64, 364 uint64 actual, 365 uint64 tag); 366 // Print out the error record of the tag mismatch. 367 virtual void ProcessTagError(struct ErrorRecord *error, 368 int priority, 369 const char *message); 370 371 // A worker thread can yield itself to give up CPU until it's scheduled again 372 bool YieldSelf(); 373 374 protected: 375 // General state variables that all subclasses need. 376 int thread_num_; // Thread ID. 377 volatile bool status_; // Error status. 378 volatile int64 pages_copied_; // Recorded for memory bandwidth calc. 379 volatile int64 errorcount_; // Miscompares seen by this thread. 380 381 cpu_set_t cpu_mask_; // Cores this thread is allowed to run on. 382 volatile uint32 tag_; // Tag hint for memory this thread can use. 383 384 bool tag_mode_; // Tag cachelines with vaddr. 385 386 // Thread timing variables. 387 struct timeval start_time_; // Worker thread start time. 388 volatile int64 runduration_usec_; // Worker run duration in u-seconds. 389 390 // Function passed to pthread_create. 391 void *(*thread_spawner_)(void *args); 392 pthread_t thread_; // Pthread thread ID. 393 Priority priority_; // Worker thread priority. 394 class Sat *sat_; // Reference to parent stest object. 395 class OsLayer *os_; // Os abstraction: put hacks here. 396 class PatternList *patternlist_; // Reference to data patterns. 397 398 // Work around style guide ban on sizeof(int). 399 static const uint64 iamint_ = 0; 400 static const int wordsize_ = sizeof(iamint_); 401 402 private: 403 WorkerStatus *worker_status_; 404 405 DISALLOW_COPY_AND_ASSIGN(WorkerThread); 406 }; 407 408 // Worker thread to perform File IO. 409 class FileThread : public WorkerThread { 410 public: 411 FileThread(); 412 // Set filename to use for file IO. 413 virtual void SetFile(const char *filename_init); 414 virtual bool Work(); 415 416 // Calculate worker thread specific bandwidth. 417 virtual float GetDeviceCopiedData() 418 {return GetCopiedData()*2;} 419 virtual float GetMemoryCopiedData(); 420 421 protected: 422 // Record of where these pages were sourced from, and what 423 // potentially broken components they passed through. 424 struct PageRec { 425 struct Pattern *pattern; // This is the data it should contain. 426 void *src; // This is the memory location the data was sourced from. 427 void *dst; // This is where it ended up. 428 }; 429 430 // These are functions used by the various work loops. 431 // Pretty print and log a data miscompare. Disks require 432 // slightly different error handling. 433 virtual void ProcessError(struct ErrorRecord *er, 434 int priority, 435 const char *message); 436 437 virtual bool OpenFile(int *pfile); 438 virtual bool CloseFile(int fd); 439 440 // Read and write whole file to disk. 441 virtual bool WritePages(int fd); 442 virtual bool ReadPages(int fd); 443 444 // Read and write pages to disk. 445 virtual bool WritePageToFile(int fd, struct page_entry *src); 446 virtual bool ReadPageFromFile(int fd, struct page_entry *dst); 447 448 // Sector tagging support. 449 virtual bool SectorTagPage(struct page_entry *src, int block); 450 virtual bool SectorValidatePage(const struct PageRec &page, 451 struct page_entry *dst, 452 int block); 453 454 // Get memory for an incoming data transfer.. 455 virtual bool PagePrepare(); 456 // Remove memory allocated for data transfer. 457 virtual bool PageTeardown(); 458 459 // Get memory for an incoming data transfer.. 460 virtual bool GetEmptyPage(struct page_entry *dst); 461 // Get memory for an outgoing data transfer.. 462 virtual bool GetValidPage(struct page_entry *dst); 463 // Throw out a used empty page. 464 virtual bool PutEmptyPage(struct page_entry *src); 465 // Throw out a used, filled page. 466 virtual bool PutValidPage(struct page_entry *src); 467 468 469 struct PageRec *page_recs_; // Array of page records. 470 int crc_page_; // Page currently being CRC checked. 471 string filename_; // Name of file to access. 472 string devicename_; // Name of device file is on. 473 474 bool page_io_; // Use page pool for IO. 475 void *local_page_; // malloc'd page fon non-pool IO. 476 int pass_; // Number of writes to the file so far. 477 478 // Tag to detect file corruption. 479 struct SectorTag { 480 volatile uint8 magic; 481 volatile uint8 block; 482 volatile uint8 sector; 483 volatile uint8 pass; 484 char pad[512-4]; 485 }; 486 487 DISALLOW_COPY_AND_ASSIGN(FileThread); 488 }; 489 490 491 // Worker thread to perform Network IO. 492 class NetworkThread : public WorkerThread { 493 public: 494 NetworkThread(); 495 // Set hostname to use for net IO. 496 virtual void SetIP(const char *ipaddr_init); 497 virtual bool Work(); 498 499 // Calculate worker thread specific bandwidth. 500 virtual float GetDeviceCopiedData() 501 {return GetCopiedData()*2;} 502 503 protected: 504 // IsReadyToRunNoPause() wrapper, for NetworkSlaveThread to override. 505 virtual bool IsNetworkStopSet(); 506 virtual bool CreateSocket(int *psocket); 507 virtual bool CloseSocket(int sock); 508 virtual bool Connect(int sock); 509 virtual bool SendPage(int sock, struct page_entry *src); 510 virtual bool ReceivePage(int sock, struct page_entry *dst); 511 char ipaddr_[256]; 512 int sock_; 513 514 private: 515 DISALLOW_COPY_AND_ASSIGN(NetworkThread); 516 }; 517 518 // Worker thread to reflect Network IO. 519 class NetworkSlaveThread : public NetworkThread { 520 public: 521 NetworkSlaveThread(); 522 // Set socket for IO. 523 virtual void SetSock(int sock); 524 virtual bool Work(); 525 526 protected: 527 virtual bool IsNetworkStopSet(); 528 529 private: 530 DISALLOW_COPY_AND_ASSIGN(NetworkSlaveThread); 531 }; 532 533 // Worker thread to detect incoming Network IO. 534 class NetworkListenThread : public NetworkThread { 535 public: 536 NetworkListenThread(); 537 virtual bool Work(); 538 539 private: 540 virtual bool Listen(); 541 virtual bool Wait(); 542 virtual bool GetConnection(int *pnewsock); 543 virtual bool SpawnSlave(int newsock, int threadid); 544 virtual bool ReapSlaves(); 545 546 // For serviced incoming connections. 547 struct ChildWorker { 548 WorkerStatus status; 549 NetworkSlaveThread thread; 550 }; 551 typedef vector<ChildWorker*> ChildVector; 552 ChildVector child_workers_; 553 554 DISALLOW_COPY_AND_ASSIGN(NetworkListenThread); 555 }; 556 557 // Worker thread to perform Memory Copy. 558 class CopyThread : public WorkerThread { 559 public: 560 CopyThread() {} 561 virtual bool Work(); 562 // Calculate worker thread specific bandwidth. 563 virtual float GetMemoryCopiedData() 564 {return GetCopiedData()*2;} 565 566 private: 567 DISALLOW_COPY_AND_ASSIGN(CopyThread); 568 }; 569 570 // Worker thread to perform Memory Invert. 571 class InvertThread : public WorkerThread { 572 public: 573 InvertThread() {} 574 virtual bool Work(); 575 // Calculate worker thread specific bandwidth. 576 virtual float GetMemoryCopiedData() 577 {return GetCopiedData()*4;} 578 579 private: 580 virtual int InvertPageUp(struct page_entry *srcpe); 581 virtual int InvertPageDown(struct page_entry *srcpe); 582 DISALLOW_COPY_AND_ASSIGN(InvertThread); 583 }; 584 585 // Worker thread to fill blank pages on startup. 586 class FillThread : public WorkerThread { 587 public: 588 FillThread(); 589 // Set how many pages this thread should fill before exiting. 590 virtual void SetFillPages(int64 num_pages_to_fill_init); 591 virtual bool Work(); 592 593 private: 594 // Fill a page with the data pattern in pe->pattern. 595 virtual bool FillPageRandom(struct page_entry *pe); 596 int64 num_pages_to_fill_; 597 DISALLOW_COPY_AND_ASSIGN(FillThread); 598 }; 599 600 // Worker thread to verify page data matches pattern data. 601 // Thread will check and replace pages until "done" flag is set, 602 // then it will check and discard pages until no more remain. 603 class CheckThread : public WorkerThread { 604 public: 605 CheckThread() {} 606 virtual bool Work(); 607 // Calculate worker thread specific bandwidth. 608 virtual float GetMemoryCopiedData() 609 {return GetCopiedData();} 610 611 private: 612 DISALLOW_COPY_AND_ASSIGN(CheckThread); 613 }; 614 615 616 // Worker thread to poll for system error messages. 617 // Thread will check for messages until "done" flag is set. 618 class ErrorPollThread : public WorkerThread { 619 public: 620 ErrorPollThread() {} 621 virtual bool Work(); 622 623 private: 624 DISALLOW_COPY_AND_ASSIGN(ErrorPollThread); 625 }; 626 627 // Computation intensive worker thread to stress CPU. 628 class CpuStressThread : public WorkerThread { 629 public: 630 CpuStressThread() {} 631 virtual bool Work(); 632 633 private: 634 DISALLOW_COPY_AND_ASSIGN(CpuStressThread); 635 }; 636 637 // Worker thread that tests the correctness of the 638 // CPU Cache Coherency Protocol. 639 class CpuCacheCoherencyThread : public WorkerThread { 640 public: 641 CpuCacheCoherencyThread(cc_cacheline_data *cc_data, 642 int cc_cacheline_count_, 643 int cc_thread_num_, 644 int cc_inc_count_); 645 virtual bool Work(); 646 647 protected: 648 cc_cacheline_data *cc_cacheline_data_; // Datstructure for each cacheline. 649 int cc_local_num_; // Local counter for each thread. 650 int cc_cacheline_count_; // Number of cache lines to operate on. 651 int cc_thread_num_; // The integer id of the thread which is 652 // used as an index into the integer array 653 // of the cacheline datastructure. 654 int cc_inc_count_; // Number of times to increment the counter. 655 656 private: 657 DISALLOW_COPY_AND_ASSIGN(CpuCacheCoherencyThread); 658 }; 659 660 // Worker thread to perform disk test. 661 class DiskThread : public WorkerThread { 662 public: 663 explicit DiskThread(DiskBlockTable *block_table); 664 virtual ~DiskThread(); 665 // Calculate disk thread specific bandwidth. 666 virtual float GetDeviceCopiedData() { 667 return (blocks_written_ * write_block_size_ + 668 blocks_read_ * read_block_size_) / kMegabyte;} 669 670 // Set filename for device file (in /dev). 671 virtual void SetDevice(const char *device_name); 672 // Set various parameters that control the behaviour of the test. 673 virtual bool SetParameters(int read_block_size, 674 int write_block_size, 675 int64 segment_size, 676 int64 cache_size, 677 int blocks_per_segment, 678 int64 read_threshold, 679 int64 write_threshold, 680 int non_destructive); 681 682 virtual bool Work(); 683 684 virtual float GetMemoryCopiedData() {return 0;} 685 686 protected: 687 static const int kSectorSize = 512; // Size of sector on disk. 688 static const int kBufferAlignment = 512; // Buffer alignment required by the 689 // kernel. 690 static const int kBlockRetry = 100; // Number of retries to allocate 691 // sectors. 692 693 enum IoOp { 694 ASYNC_IO_READ = 0, 695 ASYNC_IO_WRITE = 1 696 }; 697 698 virtual bool OpenDevice(int *pfile); 699 virtual bool CloseDevice(int fd); 700 701 // Retrieves the size (in bytes) of the disk/file. 702 virtual bool GetDiskSize(int fd); 703 704 // Retrieves the current time in microseconds. 705 virtual int64 GetTime(); 706 707 // Do an asynchronous disk I/O operation. 708 virtual bool AsyncDiskIO(IoOp op, int fd, void *buf, int64 size, 709 int64 offset, int64 timeout); 710 711 // Write a block to disk. 712 virtual bool WriteBlockToDisk(int fd, BlockData *block); 713 714 // Verify a block on disk. 715 virtual bool ValidateBlockOnDisk(int fd, BlockData *block); 716 717 // Main work loop. 718 virtual bool DoWork(int fd); 719 720 int read_block_size_; // Size of blocks read from disk, in bytes. 721 int write_block_size_; // Size of blocks written to disk, in bytes. 722 int64 blocks_read_; // Number of blocks read in work loop. 723 int64 blocks_written_; // Number of blocks written in work loop. 724 int64 segment_size_; // Size of disk segments (in bytes) that the disk 725 // will be split into where testing can be 726 // confined to a particular segment. 727 // Allows for control of how evenly the disk will 728 // be tested. Smaller segments imply more even 729 // testing (less random). 730 int blocks_per_segment_; // Number of blocks that will be tested per 731 // segment. 732 int cache_size_; // Size of disk cache, in bytes. 733 int queue_size_; // Length of in-flight-blocks queue, in blocks. 734 int non_destructive_; // Use non-destructive mode or not. 735 int update_block_table_; // If true, assume this is the thread 736 // responsible for writing the data in the disk 737 // for this block device and, therefore, 738 // update the block table. If false, just use 739 // the block table to get data. 740 741 // read/write times threshold for reporting a problem 742 int64 read_threshold_; // Maximum time a read should take (in us) before 743 // a warning is given. 744 int64 write_threshold_; // Maximum time a write should take (in us) before 745 // a warning is given. 746 int64 read_timeout_; // Maximum time a read can take before a timeout 747 // and the aborting of the read operation. 748 int64 write_timeout_; // Maximum time a write can take before a timeout 749 // and the aborting of the write operation. 750 751 string device_name_; // Name of device file to access. 752 int64 device_sectors_; // Number of sectors on the device. 753 754 std::queue<BlockData*> in_flight_sectors_; // Queue of sectors written but 755 // not verified. 756 void *block_buffer_; // Pointer to aligned block buffer. 757 758 #ifdef HAVE_LIBAIO_H 759 io_context_t aio_ctx_; // Asynchronous I/O context for Linux native AIO. 760 #endif 761 762 DiskBlockTable *block_table_; // Disk Block Table, shared by all disk 763 // threads that read / write at the same 764 // device 765 766 DISALLOW_COPY_AND_ASSIGN(DiskThread); 767 }; 768 769 class RandomDiskThread : public DiskThread { 770 public: 771 explicit RandomDiskThread(DiskBlockTable *block_table); 772 virtual ~RandomDiskThread(); 773 // Main work loop. 774 virtual bool DoWork(int fd); 775 protected: 776 DISALLOW_COPY_AND_ASSIGN(RandomDiskThread); 777 }; 778 779 // Worker thread to perform checks in a specific memory region. 780 class MemoryRegionThread : public WorkerThread { 781 public: 782 MemoryRegionThread(); 783 ~MemoryRegionThread(); 784 virtual bool Work(); 785 void ProcessError(struct ErrorRecord *error, int priority, 786 const char *message); 787 bool SetRegion(void *region, int64 size); 788 // Calculate worker thread specific bandwidth. 789 virtual float GetMemoryCopiedData() 790 {return GetCopiedData();} 791 virtual float GetDeviceCopiedData() 792 {return GetCopiedData() * 2;} 793 void SetIdentifier(string identifier) { 794 identifier_ = identifier; 795 } 796 797 protected: 798 // Page queue for this particular memory region. 799 char *region_; 800 PageEntryQueue *pages_; 801 bool error_injection_; 802 int phase_; 803 string identifier_; 804 static const int kPhaseNoPhase = 0; 805 static const int kPhaseCopy = 1; 806 static const int kPhaseCheck = 2; 807 808 private: 809 DISALLOW_COPY_AND_ASSIGN(MemoryRegionThread); 810 }; 811 812 #endif // STRESSAPPTEST_WORKER_H_ 813