1 # -*- coding: utf-8 -*- 2 # Copyright 2012 Google Inc. All Rights Reserved. 3 # 4 # Licensed under the Apache License, Version 2.0 (the "License"); 5 # you may not use this file except in compliance with the License. 6 # You may obtain a copy of the License at 7 # 8 # http://www.apache.org/licenses/LICENSE-2.0 9 # 10 # Unless required by applicable law or agreed to in writing, software 11 # distributed under the License is distributed on an "AS IS" BASIS, 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 # See the License for the specific language governing permissions and 14 # limitations under the License. 15 """Contains the perfdiag gsutil command.""" 16 17 from __future__ import absolute_import 18 19 import calendar 20 from collections import defaultdict 21 from collections import namedtuple 22 import contextlib 23 import cStringIO 24 import datetime 25 import httplib 26 import json 27 import logging 28 import math 29 import multiprocessing 30 import os 31 import random 32 import re 33 import socket 34 import string 35 import subprocess 36 import tempfile 37 import time 38 39 import boto 40 import boto.gs.connection 41 42 import gslib 43 from gslib.cloud_api import NotFoundException 44 from gslib.cloud_api import ServiceException 45 from gslib.cloud_api_helper import GetDownloadSerializationData 46 from gslib.command import Command 47 from gslib.command import DummyArgChecker 48 from gslib.command_argument import CommandArgument 49 from gslib.commands import config 50 from gslib.cs_api_map import ApiSelector 51 from gslib.exception import CommandException 52 from gslib.file_part import FilePart 53 from gslib.hashing_helper import CalculateB64EncodedMd5FromContents 54 from gslib.storage_url import StorageUrlFromString 55 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages 56 from gslib.util import CheckFreeSpace 57 from gslib.util import DivideAndCeil 58 from gslib.util import GetCloudApiInstance 59 from gslib.util import GetFileSize 60 from gslib.util import GetMaxRetryDelay 61 from gslib.util import HumanReadableToBytes 62 from gslib.util import IS_LINUX 63 from gslib.util import MakeBitsHumanReadable 64 from gslib.util import MakeHumanReadable 65 from gslib.util import Percentile 66 from gslib.util import ResumableThreshold 67 68 _SYNOPSIS = """ 69 gsutil perfdiag [-i in.json] 70 gsutil perfdiag [-o out.json] [-n objects] [-c processes] 71 [-k threads] [-p parallelism type] [-y slices] [-s size] [-d directory] 72 [-t tests] url... 73 """ 74 75 _DETAILED_HELP_TEXT = (""" 76 <B>SYNOPSIS</B> 77 """ + _SYNOPSIS + """ 78 79 80 <B>DESCRIPTION</B> 81 The perfdiag command runs a suite of diagnostic tests for a given Google 82 Storage bucket. 83 84 The 'url' parameter must name an existing bucket (e.g. gs://foo) to which 85 the user has write permission. Several test files will be uploaded to and 86 downloaded from this bucket. All test files will be deleted at the completion 87 of the diagnostic if it finishes successfully. 88 89 gsutil performance can be impacted by many factors at the client, server, 90 and in-between, such as: CPU speed; available memory; the access path to the 91 local disk; network bandwidth; contention and error rates along the path 92 between gsutil and Google; operating system buffering configuration; and 93 firewalls and other network elements. The perfdiag command is provided so 94 that customers can run a known measurement suite when troubleshooting 95 performance problems. 96 97 98 <B>PROVIDING DIAGNOSTIC OUTPUT TO GOOGLE CLOUD STORAGE TEAM</B> 99 If the Google Cloud Storage Team asks you to run a performance diagnostic 100 please use the following command, and email the output file (output.json) 101 to gs-team (at] google.com: 102 103 gsutil perfdiag -o output.json gs://your-bucket 104 105 106 <B>OPTIONS</B> 107 -n Sets the number of objects to use when downloading and uploading 108 files during tests. Defaults to 5. 109 110 -c Sets the number of processes to use while running throughput 111 experiments. The default value is 1. 112 113 -k Sets the number of threads per process to use while running 114 throughput experiments. Each process will receive an equal number 115 of threads. The default value is 1. 116 117 Note: All specified threads and processes will be created, but may 118 not by saturated with work if too few objects (specified with -n) 119 and too few components (specified with -y) are specified. 120 121 -p Sets the type of parallelism to be used (only applicable when 122 threads or processes are specified and threads * processes > 1). 123 The default is to use fan. Must be one of the following: 124 125 fan 126 Use one thread per object. This is akin to using gsutil -m cp, 127 with sliced object download / parallel composite upload 128 disabled. 129 130 slice 131 Use Y (specified with -y) threads for each object, transferring 132 one object at a time. This is akin to using parallel object 133 download / parallel composite upload, without -m. Sliced 134 uploads not supported for s3. 135 136 both 137 Use Y (specified with -y) threads for each object, transferring 138 multiple objects at a time. This is akin to simultaneously 139 using sliced object download / parallel composite upload and 140 gsutil -m cp. Sliced uploads not supported for s3. 141 142 -y Sets the number of slices to divide each file/object into while 143 transferring data. Only applicable with the slice (or both) 144 parallelism type. The default is 4 slices. 145 146 -s Sets the size (in bytes) for each of the N (set with -n) objects 147 used in the read and write throughput tests. The default is 1 MiB. 148 This can also be specified using byte suffixes such as 500K or 1M. 149 Note: these values are interpreted as multiples of 1024 (K=1024, 150 M=1024*1024, etc.) 151 Note: If rthru_file or wthru_file are performed, N (set with -n) 152 times as much disk space as specified will be required for the 153 operation. 154 155 -d Sets the directory to store temporary local files in. If not 156 specified, a default temporary directory will be used. 157 158 -t Sets the list of diagnostic tests to perform. The default is to 159 run the lat, rthru, and wthru diagnostic tests. Must be a 160 comma-separated list containing one or more of the following: 161 162 lat 163 For N (set with -n) objects, write the object, retrieve its 164 metadata, read the object, and finally delete the object. 165 Record the latency of each operation. 166 167 list 168 Write N (set with -n) objects to the bucket, record how long 169 it takes for the eventually consistent listing call to return 170 the N objects in its result, delete the N objects, then record 171 how long it takes listing to stop returning the N objects. 172 This test is off by default. 173 174 rthru 175 Runs N (set with -n) read operations, with at most C 176 (set with -c) reads outstanding at any given time. 177 178 rthru_file 179 The same as rthru, but simultaneously writes data to the disk, 180 to gauge the performance impact of the local disk on downloads. 181 182 wthru 183 Runs N (set with -n) write operations, with at most C 184 (set with -c) writes outstanding at any given time. 185 186 wthru_file 187 The same as wthru, but simultaneously reads data from the disk, 188 to gauge the performance impact of the local disk on uploads. 189 190 -m Adds metadata to the result JSON file. Multiple -m values can be 191 specified. Example: 192 193 gsutil perfdiag -m "key1:val1" -m "key2:val2" gs://bucketname 194 195 Each metadata key will be added to the top-level "metadata" 196 dictionary in the output JSON file. 197 198 -o Writes the results of the diagnostic to an output file. The output 199 is a JSON file containing system information and performance 200 diagnostic results. The file can be read and reported later using 201 the -i option. 202 203 -i Reads the JSON output file created using the -o command and prints 204 a formatted description of the results. 205 206 207 <B>MEASURING AVAILABILITY</B> 208 The perfdiag command ignores the boto num_retries configuration parameter. 209 Instead, it always retries on HTTP errors in the 500 range and keeps track of 210 how many 500 errors were encountered during the test. The availability 211 measurement is reported at the end of the test. 212 213 Note that HTTP responses are only recorded when the request was made in a 214 single process. When using multiple processes or threads, read and write 215 throughput measurements are performed in an external process, so the 216 availability numbers reported won't include the throughput measurements. 217 218 219 <B>NOTE</B> 220 The perfdiag command collects system information. It collects your IP address, 221 executes DNS queries to Google servers and collects the results, and collects 222 network statistics information from the output of netstat -s. It will also 223 attempt to connect to your proxy server if you have one configured. None of 224 this information will be sent to Google unless you choose to send it. 225 """) 226 227 FileDataTuple = namedtuple( 228 'FileDataTuple', 229 'size md5 data') 230 231 # Describes one object in a fanned download. If need_to_slice is specified as 232 # True, the object should be downloaded with the slice strategy. Other field 233 # names are the same as documented in PerfDiagCommand.Download. 234 FanDownloadTuple = namedtuple( 235 'FanDownloadTuple', 236 'need_to_slice object_name file_name serialization_data') 237 238 # Describes one slice in a sliced download. 239 # Field names are the same as documented in PerfDiagCommand.Download. 240 SliceDownloadTuple = namedtuple( 241 'SliceDownloadTuple', 242 'object_name file_name serialization_data start_byte end_byte') 243 244 # Describes one file in a fanned upload. If need_to_slice is specified as 245 # True, the file should be uploaded with the slice strategy. Other field 246 # names are the same as documented in PerfDiagCommand.Upload. 247 FanUploadTuple = namedtuple( 248 'FanUploadTuple', 249 'need_to_slice file_name object_name use_file') 250 251 # Describes one slice in a sliced upload. 252 # Field names are the same as documented in PerfDiagCommand.Upload. 253 SliceUploadTuple = namedtuple( 254 'SliceUploadTuple', 255 'file_name object_name use_file file_start file_size') 256 257 # Dict storing file_path:FileDataTuple for each temporary file used by 258 # perfdiag. This data should be kept outside of the PerfDiagCommand class 259 # since calls to Apply will make copies of all member data. 260 temp_file_dict = {} 261 262 263 class Error(Exception): 264 """Base exception class for this module.""" 265 pass 266 267 268 class InvalidArgument(Error): 269 """Raised on invalid arguments to functions.""" 270 pass 271 272 273 def _DownloadObject(cls, args, thread_state=None): 274 """Function argument to apply for performing fanned parallel downloads. 275 276 Args: 277 cls: The calling PerfDiagCommand class instance. 278 args: A FanDownloadTuple object describing this download. 279 thread_state: gsutil Cloud API instance to use for the operation. 280 """ 281 cls.gsutil_api = GetCloudApiInstance(cls, thread_state) 282 if args.need_to_slice: 283 cls.PerformSlicedDownload(args.object_name, args.file_name, 284 args.serialization_data) 285 else: 286 cls.Download(args.object_name, args.file_name, args.serialization_data) 287 288 289 def _DownloadSlice(cls, args, thread_state=None): 290 """Function argument to apply for performing sliced downloads. 291 292 Args: 293 cls: The calling PerfDiagCommand class instance. 294 args: A SliceDownloadTuple object describing this download. 295 thread_state: gsutil Cloud API instance to use for the operation. 296 """ 297 cls.gsutil_api = GetCloudApiInstance(cls, thread_state) 298 cls.Download(args.object_name, args.file_name, args.serialization_data, 299 args.start_byte, args.end_byte) 300 301 302 def _UploadObject(cls, args, thread_state=None): 303 """Function argument to apply for performing fanned parallel uploads. 304 305 Args: 306 cls: The calling PerfDiagCommand class instance. 307 args: A FanUploadTuple object describing this upload. 308 thread_state: gsutil Cloud API instance to use for the operation. 309 """ 310 cls.gsutil_api = GetCloudApiInstance(cls, thread_state) 311 if args.need_to_slice: 312 cls.PerformSlicedUpload(args.file_name, args.object_name, args.use_file) 313 else: 314 cls.Upload(args.file_name, args.object_name, args.use_file) 315 316 317 def _UploadSlice(cls, args, thread_state=None): 318 """Function argument to apply for performing sliced parallel uploads. 319 320 Args: 321 cls: The calling PerfDiagCommand class instance. 322 args: A SliceUploadTuple object describing this upload. 323 thread_state: gsutil Cloud API instance to use for the operation. 324 """ 325 cls.gsutil_api = GetCloudApiInstance(cls, thread_state) 326 cls.Upload(args.file_name, args.object_name, args.use_file, 327 args.file_start, args.file_size) 328 329 330 def _DeleteWrapper(cls, object_name, thread_state=None): 331 """Function argument to apply for performing parallel object deletions. 332 333 Args: 334 cls: The calling PerfDiagCommand class instance. 335 object_name: The object name to delete from the test bucket. 336 thread_state: gsutil Cloud API instance to use for the operation. 337 """ 338 cls.gsutil_api = GetCloudApiInstance(cls, thread_state) 339 cls.Delete(object_name) 340 341 342 def _PerfdiagExceptionHandler(cls, e): 343 """Simple exception handler to allow post-completion status.""" 344 cls.logger.error(str(e)) 345 346 347 def _DummyTrackerCallback(_): 348 pass 349 350 351 class DummyFile(object): 352 """A dummy, file-like object that throws away everything written to it.""" 353 354 def write(self, *args, **kwargs): # pylint: disable=invalid-name 355 pass 356 357 def close(self): # pylint: disable=invalid-name 358 pass 359 360 361 # Many functions in perfdiag re-define a temporary function based on a 362 # variable from a loop, resulting in a false positive from the linter. 363 # pylint: disable=cell-var-from-loop 364 class PerfDiagCommand(Command): 365 """Implementation of gsutil perfdiag command.""" 366 367 # Command specification. See base class for documentation. 368 command_spec = Command.CreateCommandSpec( 369 'perfdiag', 370 command_name_aliases=['diag', 'diagnostic', 'perf', 'performance'], 371 usage_synopsis=_SYNOPSIS, 372 min_args=0, 373 max_args=1, 374 supported_sub_args='n:c:k:p:y:s:d:t:m:i:o:', 375 file_url_ok=False, 376 provider_url_ok=False, 377 urls_start_arg=0, 378 gs_api_support=[ApiSelector.XML, ApiSelector.JSON], 379 gs_default_api=ApiSelector.JSON, 380 argparse_arguments=[ 381 CommandArgument.MakeNCloudBucketURLsArgument(1) 382 ] 383 ) 384 # Help specification. See help_provider.py for documentation. 385 help_spec = Command.HelpSpec( 386 help_name='perfdiag', 387 help_name_aliases=[], 388 help_type='command_help', 389 help_one_line_summary='Run performance diagnostic', 390 help_text=_DETAILED_HELP_TEXT, 391 subcommand_help_text={}, 392 ) 393 394 # Byte sizes to use for latency testing files. 395 # TODO: Consider letting the user specify these sizes with a configuration 396 # parameter. 397 test_lat_file_sizes = ( 398 0, # 0 bytes 399 1024, # 1 KiB 400 102400, # 100 KiB 401 1048576, # 1 MiB 402 ) 403 404 # Test names. 405 RTHRU = 'rthru' 406 RTHRU_FILE = 'rthru_file' 407 WTHRU = 'wthru' 408 WTHRU_FILE = 'wthru_file' 409 LAT = 'lat' 410 LIST = 'list' 411 412 # Parallelism strategies. 413 FAN = 'fan' 414 SLICE = 'slice' 415 BOTH = 'both' 416 417 # List of all diagnostic tests. 418 ALL_DIAG_TESTS = (RTHRU, RTHRU_FILE, WTHRU, WTHRU_FILE, LAT, LIST) 419 420 # List of diagnostic tests to run by default. 421 DEFAULT_DIAG_TESTS = (RTHRU, WTHRU, LAT) 422 423 # List of parallelism strategies. 424 PARALLEL_STRATEGIES = (FAN, SLICE, BOTH) 425 426 # Google Cloud Storage XML API endpoint host. 427 XML_API_HOST = boto.config.get( 428 'Credentials', 'gs_host', boto.gs.connection.GSConnection.DefaultHost) 429 # Google Cloud Storage XML API endpoint port. 430 XML_API_PORT = boto.config.get('Credentials', 'gs_port', 80) 431 432 # Maximum number of times to retry requests on 5xx errors. 433 MAX_SERVER_ERROR_RETRIES = 5 434 # Maximum number of times to retry requests on more serious errors like 435 # the socket breaking. 436 MAX_TOTAL_RETRIES = 10 437 438 # The default buffer size in boto's Key object is set to 8 KiB. This becomes a 439 # bottleneck at high throughput rates, so we increase it. 440 KEY_BUFFER_SIZE = 16384 441 442 # The maximum number of bytes to generate pseudo-randomly before beginning 443 # to repeat bytes. This number was chosen as the next prime larger than 5 MiB. 444 MAX_UNIQUE_RANDOM_BYTES = 5242883 445 446 # Maximum amount of time, in seconds, we will wait for object listings to 447 # reflect what we expect in the listing tests. 448 MAX_LISTING_WAIT_TIME = 60.0 449 450 def _Exec(self, cmd, raise_on_error=True, return_output=False, 451 mute_stderr=False): 452 """Executes a command in a subprocess. 453 454 Args: 455 cmd: List containing the command to execute. 456 raise_on_error: Whether or not to raise an exception when a process exits 457 with a non-zero return code. 458 return_output: If set to True, the return value of the function is the 459 stdout of the process. 460 mute_stderr: If set to True, the stderr of the process is not printed to 461 the console. 462 463 Returns: 464 The return code of the process or the stdout if return_output is set. 465 466 Raises: 467 Exception: If raise_on_error is set to True and any process exits with a 468 non-zero return code. 469 """ 470 self.logger.debug('Running command: %s', cmd) 471 stderr = subprocess.PIPE if mute_stderr else None 472 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=stderr) 473 (stdoutdata, _) = p.communicate() 474 if raise_on_error and p.returncode: 475 raise CommandException("Received non-zero return code (%d) from " 476 "subprocess '%s'." % (p.returncode, ' '.join(cmd))) 477 return stdoutdata if return_output else p.returncode 478 479 def _WarnIfLargeData(self): 480 """Outputs a warning message if a large amount of data is being used.""" 481 if self.num_objects * self.thru_filesize > HumanReadableToBytes('2GiB'): 482 self.logger.info('This is a large operation, and could take a while.') 483 484 def _MakeTempFile(self, file_size=0, mem_metadata=False, 485 mem_data=False, prefix='gsutil_test_file'): 486 """Creates a temporary file of the given size and returns its path. 487 488 Args: 489 file_size: The size of the temporary file to create. 490 mem_metadata: If true, store md5 and file size in memory at 491 temp_file_dict[fpath].md5, tempfile_data[fpath].file_size. 492 mem_data: If true, store the file data in memory at 493 temp_file_dict[fpath].data 494 prefix: The prefix to use for the temporary file. Defaults to 495 gsutil_test_file. 496 497 Returns: 498 The file path of the created temporary file. 499 """ 500 fd, fpath = tempfile.mkstemp(suffix='.bin', prefix=prefix, 501 dir=self.directory, text=False) 502 with os.fdopen(fd, 'wb') as fp: 503 random_bytes = os.urandom(min(file_size, 504 self.MAX_UNIQUE_RANDOM_BYTES)) 505 total_bytes_written = 0 506 while total_bytes_written < file_size: 507 num_bytes = min(self.MAX_UNIQUE_RANDOM_BYTES, 508 file_size - total_bytes_written) 509 fp.write(random_bytes[:num_bytes]) 510 total_bytes_written += num_bytes 511 512 if mem_metadata or mem_data: 513 with open(fpath, 'rb') as fp: 514 file_size = GetFileSize(fp) if mem_metadata else None 515 md5 = CalculateB64EncodedMd5FromContents(fp) if mem_metadata else None 516 data = fp.read() if mem_data else None 517 temp_file_dict[fpath] = FileDataTuple(file_size, md5, data) 518 519 self.temporary_files.add(fpath) 520 return fpath 521 522 def _SetUp(self): 523 """Performs setup operations needed before diagnostics can be run.""" 524 525 # Stores test result data. 526 self.results = {} 527 # Set of file paths for local temporary files. 528 self.temporary_files = set() 529 # Set of names for test objects that exist in the test bucket. 530 self.temporary_objects = set() 531 # Total number of HTTP requests made. 532 self.total_requests = 0 533 # Total number of HTTP 5xx errors. 534 self.request_errors = 0 535 # Number of responses, keyed by response code. 536 self.error_responses_by_code = defaultdict(int) 537 # Total number of socket errors. 538 self.connection_breaks = 0 539 # Boolean to prevent doing cleanup twice. 540 self.teardown_completed = False 541 542 # Create files for latency test. 543 if self.LAT in self.diag_tests: 544 self.latency_files = [] 545 for file_size in self.test_lat_file_sizes: 546 fpath = self._MakeTempFile(file_size, mem_metadata=True, mem_data=True) 547 self.latency_files.append(fpath) 548 549 # Create files for throughput tests. 550 if self.diag_tests.intersection( 551 (self.RTHRU, self.WTHRU, self.RTHRU_FILE, self.WTHRU_FILE)): 552 # Create a file for warming up the TCP connection. 553 self.tcp_warmup_file = self._MakeTempFile( 554 5 * 1024 * 1024, mem_metadata=True, mem_data=True) 555 556 # For in memory tests, throughput tests transfer the same object N times 557 # instead of creating N objects, in order to avoid excessive memory usage. 558 if self.diag_tests.intersection((self.RTHRU, self.WTHRU)): 559 self.mem_thru_file_name = self._MakeTempFile( 560 self.thru_filesize, mem_metadata=True, mem_data=True) 561 self.mem_thru_object_name = os.path.basename(self.mem_thru_file_name) 562 563 # For tests that use disk I/O, it is necessary to create N objects in 564 # in order to properly measure the performance impact of seeks. 565 if self.diag_tests.intersection((self.RTHRU_FILE, self.WTHRU_FILE)): 566 # List of file names and corresponding object names to use for file 567 # throughput tests. 568 self.thru_file_names = [] 569 self.thru_object_names = [] 570 571 free_disk_space = CheckFreeSpace(self.directory) 572 if free_disk_space >= self.thru_filesize * self.num_objects: 573 self.logger.info('\nCreating %d local files each of size %s.' 574 % (self.num_objects, 575 MakeHumanReadable(self.thru_filesize))) 576 self._WarnIfLargeData() 577 for _ in range(self.num_objects): 578 file_name = self._MakeTempFile(self.thru_filesize, 579 mem_metadata=True) 580 self.thru_file_names.append(file_name) 581 self.thru_object_names.append(os.path.basename(file_name)) 582 else: 583 raise CommandException( 584 'Not enough free disk space for throughput files: ' 585 '%s of disk space required, but only %s available.' 586 % (MakeHumanReadable(self.thru_filesize * self.num_objects), 587 MakeHumanReadable(free_disk_space))) 588 589 # Dummy file buffer to use for downloading that goes nowhere. 590 self.discard_sink = DummyFile() 591 592 # Filter out misleading progress callback output and the incorrect 593 # suggestion to use gsutil -m perfdiag. 594 self.logger.addFilter(self._PerfdiagFilter()) 595 596 def _TearDown(self): 597 """Performs operations to clean things up after performing diagnostics.""" 598 if not self.teardown_completed: 599 temp_file_dict.clear() 600 601 try: 602 for fpath in self.temporary_files: 603 os.remove(fpath) 604 if self.delete_directory: 605 os.rmdir(self.directory) 606 except OSError: 607 pass 608 609 if self.threads > 1 or self.processes > 1: 610 args = [obj for obj in self.temporary_objects] 611 self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler, 612 arg_checker=DummyArgChecker, 613 parallel_operations_override=True, 614 process_count=self.processes, thread_count=self.threads) 615 else: 616 for object_name in self.temporary_objects: 617 self.Delete(object_name) 618 self.teardown_completed = True 619 620 @contextlib.contextmanager 621 def _Time(self, key, bucket): 622 """A context manager that measures time. 623 624 A context manager that prints a status message before and after executing 625 the inner command and times how long the inner command takes. Keeps track of 626 the timing, aggregated by the given key. 627 628 Args: 629 key: The key to insert the timing value into a dictionary bucket. 630 bucket: A dictionary to place the timing value in. 631 632 Yields: 633 For the context manager. 634 """ 635 self.logger.info('%s starting...', key) 636 t0 = time.time() 637 yield 638 t1 = time.time() 639 bucket[key].append(t1 - t0) 640 self.logger.info('%s done.', key) 641 642 def _RunOperation(self, func): 643 """Runs an operation with retry logic. 644 645 Args: 646 func: The function to run. 647 648 Returns: 649 True if the operation succeeds, False if aborted. 650 """ 651 # We retry on httplib exceptions that can happen if the socket was closed 652 # by the remote party or the connection broke because of network issues. 653 # Only the BotoServerError is counted as a 5xx error towards the retry 654 # limit. 655 success = False 656 server_error_retried = 0 657 total_retried = 0 658 i = 0 659 return_val = None 660 while not success: 661 next_sleep = min(random.random() * (2 ** i) + 1, GetMaxRetryDelay()) 662 try: 663 return_val = func() 664 self.total_requests += 1 665 success = True 666 except tuple(self.exceptions) as e: 667 total_retried += 1 668 if total_retried > self.MAX_TOTAL_RETRIES: 669 self.logger.info('Reached maximum total retries. Not retrying.') 670 break 671 if isinstance(e, ServiceException): 672 if e.status >= 500: 673 self.error_responses_by_code[e.status] += 1 674 self.total_requests += 1 675 self.request_errors += 1 676 server_error_retried += 1 677 time.sleep(next_sleep) 678 else: 679 raise 680 if server_error_retried > self.MAX_SERVER_ERROR_RETRIES: 681 self.logger.info( 682 'Reached maximum server error retries. Not retrying.') 683 break 684 else: 685 self.connection_breaks += 1 686 return return_val 687 688 def _RunLatencyTests(self): 689 """Runs latency tests.""" 690 # Stores timing information for each category of operation. 691 self.results['latency'] = defaultdict(list) 692 693 for i in range(self.num_objects): 694 self.logger.info('\nRunning latency iteration %d...', i+1) 695 for fpath in self.latency_files: 696 file_data = temp_file_dict[fpath] 697 url = self.bucket_url.Clone() 698 url.object_name = os.path.basename(fpath) 699 file_size = file_data.size 700 readable_file_size = MakeHumanReadable(file_size) 701 702 self.logger.info( 703 "\nFile of size %s located on disk at '%s' being diagnosed in the " 704 "cloud at '%s'.", readable_file_size, fpath, url) 705 706 upload_target = StorageUrlToUploadObjectMetadata(url) 707 708 def _Upload(): 709 io_fp = cStringIO.StringIO(file_data.data) 710 with self._Time('UPLOAD_%d' % file_size, self.results['latency']): 711 self.gsutil_api.UploadObject( 712 io_fp, upload_target, size=file_size, provider=self.provider, 713 fields=['name']) 714 self._RunOperation(_Upload) 715 716 def _Metadata(): 717 with self._Time('METADATA_%d' % file_size, self.results['latency']): 718 return self.gsutil_api.GetObjectMetadata( 719 url.bucket_name, url.object_name, 720 provider=self.provider, fields=['name', 'contentType', 721 'mediaLink', 'size']) 722 # Download will get the metadata first if we don't pass it in. 723 download_metadata = self._RunOperation(_Metadata) 724 serialization_data = GetDownloadSerializationData(download_metadata) 725 726 def _Download(): 727 with self._Time('DOWNLOAD_%d' % file_size, self.results['latency']): 728 self.gsutil_api.GetObjectMedia( 729 url.bucket_name, url.object_name, self.discard_sink, 730 provider=self.provider, serialization_data=serialization_data) 731 self._RunOperation(_Download) 732 733 def _Delete(): 734 with self._Time('DELETE_%d' % file_size, self.results['latency']): 735 self.gsutil_api.DeleteObject(url.bucket_name, url.object_name, 736 provider=self.provider) 737 self._RunOperation(_Delete) 738 739 class _PerfdiagFilter(logging.Filter): 740 741 def filter(self, record): 742 # Used to prevent unnecessary output when using multiprocessing. 743 msg = record.getMessage() 744 return not (('Copying file:///' in msg) or ('Copying gs://' in msg) or 745 ('Computing CRC' in msg) or ('gsutil -m perfdiag' in msg)) 746 747 def _PerfdiagExceptionHandler(self, e): 748 """Simple exception handler to allow post-completion status.""" 749 self.logger.error(str(e)) 750 751 def PerformFannedDownload(self, need_to_slice, object_names, file_names, 752 serialization_data): 753 """Performs a parallel download of multiple objects using the fan strategy. 754 755 Args: 756 need_to_slice: If True, additionally apply the slice strategy to each 757 object in object_names. 758 object_names: A list of object names to be downloaded. Each object must 759 already exist in the test bucket. 760 file_names: A list, corresponding by index to object_names, of file names 761 for downloaded data. If None, discard downloaded data. 762 serialization_data: A list, corresponding by index to object_names, 763 of serialization data for each object. 764 """ 765 args = [] 766 for i in range(len(object_names)): 767 file_name = file_names[i] if file_names else None 768 args.append(FanDownloadTuple( 769 need_to_slice, object_names[i], file_name, 770 serialization_data[i])) 771 self.Apply(_DownloadObject, args, _PerfdiagExceptionHandler, 772 ('total_requests', 'request_errors'), 773 arg_checker=DummyArgChecker, parallel_operations_override=True, 774 process_count=self.processes, thread_count=self.threads) 775 776 def PerformSlicedDownload(self, object_name, file_name, serialization_data): 777 """Performs a download of an object using the slice strategy. 778 779 Args: 780 object_name: The name of the object to download. 781 file_name: The name of the file to download data to, or None if data 782 should be discarded. 783 serialization_data: The serialization data for the object. 784 """ 785 if file_name: 786 with open(file_name, 'ab') as fp: 787 fp.truncate(self.thru_filesize) 788 component_size = DivideAndCeil(self.thru_filesize, self.num_slices) 789 args = [] 790 for i in range(self.num_slices): 791 start_byte = i * component_size 792 end_byte = min((i + 1) * (component_size) - 1, self.thru_filesize - 1) 793 args.append(SliceDownloadTuple(object_name, file_name, serialization_data, 794 start_byte, end_byte)) 795 self.Apply(_DownloadSlice, args, _PerfdiagExceptionHandler, 796 ('total_requests', 'request_errors'), 797 arg_checker=DummyArgChecker, parallel_operations_override=True, 798 process_count=self.processes, thread_count=self.threads) 799 800 def PerformFannedUpload(self, need_to_slice, file_names, object_names, 801 use_file): 802 """Performs a parallel upload of multiple files using the fan strategy. 803 804 The metadata for file_name should be present in temp_file_dict prior 805 to calling. Also, the data for file_name should be present in temp_file_dict 806 if use_file is specified as False. 807 808 Args: 809 need_to_slice: If True, additionally apply the slice strategy to each 810 file in file_names. 811 file_names: A list of file names to be uploaded. 812 object_names: A list, corresponding by by index to file_names, of object 813 names to upload data to. 814 use_file: If true, use disk I/O, otherwise read upload data from memory. 815 """ 816 args = [] 817 for i in range(len(file_names)): 818 args.append(FanUploadTuple( 819 need_to_slice, file_names[i], object_names[i], use_file)) 820 self.Apply(_UploadObject, args, _PerfdiagExceptionHandler, 821 ('total_requests', 'request_errors'), 822 arg_checker=DummyArgChecker, parallel_operations_override=True, 823 process_count=self.processes, thread_count=self.threads) 824 825 def PerformSlicedUpload(self, file_name, object_name, use_file): 826 """Performs a parallel upload of a file using the slice strategy. 827 828 The metadata for file_name should be present in temp_file_dict prior 829 to calling. Also, the data from for file_name should be present in 830 temp_file_dict if use_file is specified as False. 831 832 Args: 833 file_name: The name of the file to upload. 834 object_name: The name of the object to upload to. 835 use_file: If true, use disk I/O, otherwise read upload data from memory. 836 """ 837 # Divide the file into components. 838 component_size = DivideAndCeil(self.thru_filesize, self.num_slices) 839 component_object_names = ( 840 [object_name + str(i) for i in range(self.num_slices)]) 841 842 args = [] 843 for i in range(self.num_slices): 844 component_start = i * component_size 845 component_size = min(component_size, 846 temp_file_dict[file_name].size - component_start) 847 args.append(SliceUploadTuple(file_name, component_object_names[i], 848 use_file, component_start, component_size)) 849 850 # Upload the components in parallel. 851 try: 852 self.Apply(_UploadSlice, args, _PerfdiagExceptionHandler, 853 ('total_requests', 'request_errors'), 854 arg_checker=DummyArgChecker, parallel_operations_override=True, 855 process_count=self.processes, thread_count=self.threads) 856 857 # Compose the components into an object. 858 request_components = [] 859 for i in range(self.num_slices): 860 src_obj_metadata = ( 861 apitools_messages.ComposeRequest.SourceObjectsValueListEntry( 862 name=component_object_names[i])) 863 request_components.append(src_obj_metadata) 864 865 dst_obj_metadata = apitools_messages.Object() 866 dst_obj_metadata.name = object_name 867 dst_obj_metadata.bucket = self.bucket_url.bucket_name 868 def _Compose(): 869 self.gsutil_api.ComposeObject(request_components, dst_obj_metadata, 870 provider=self.provider) 871 self._RunOperation(_Compose) 872 finally: 873 # Delete the temporary components. 874 self.Apply(_DeleteWrapper, component_object_names, 875 _PerfdiagExceptionHandler, 876 ('total_requests', 'request_errors'), 877 arg_checker=DummyArgChecker, parallel_operations_override=True, 878 process_count=self.processes, thread_count=self.threads) 879 880 def _RunReadThruTests(self, use_file=False): 881 """Runs read throughput tests.""" 882 test_name = 'read_throughput_file' if use_file else 'read_throughput' 883 file_io_string = 'with file I/O' if use_file else '' 884 self.logger.info( 885 '\nRunning read throughput tests %s (%s objects of size %s)' % 886 (file_io_string, self.num_objects, 887 MakeHumanReadable(self.thru_filesize))) 888 self._WarnIfLargeData() 889 890 self.results[test_name] = {'file_size': self.thru_filesize, 891 'processes': self.processes, 892 'threads': self.threads, 893 'parallelism': self.parallel_strategy 894 } 895 896 # Copy the file(s) to the test bucket, and also get the serialization data 897 # so that we can pass it to download. 898 if use_file: 899 # For test with file I/O use N files on disk to preserve seek performance. 900 file_names = self.thru_file_names 901 object_names = self.thru_object_names 902 serialization_data = [] 903 for i in range(self.num_objects): 904 self.temporary_objects.add(self.thru_object_names[i]) 905 if self.WTHRU_FILE in self.diag_tests: 906 # If we ran the WTHRU_FILE test, then the objects already exist. 907 obj_metadata = self.gsutil_api.GetObjectMetadata( 908 self.bucket_url.bucket_name, self.thru_object_names[i], 909 fields=['size', 'mediaLink'], provider=self.bucket_url.scheme) 910 else: 911 obj_metadata = self.Upload(self.thru_file_names[i], 912 self.thru_object_names[i], use_file) 913 914 # File overwrite causes performance issues with sliced downloads. 915 # Delete the file and reopen it for download. This matches what a real 916 # download would look like. 917 os.unlink(self.thru_file_names[i]) 918 open(self.thru_file_names[i], 'ab').close() 919 serialization_data.append(GetDownloadSerializationData(obj_metadata)) 920 else: 921 # For in-memory test only use one file but copy it num_objects times, to 922 # allow scalability in num_objects. 923 self.temporary_objects.add(self.mem_thru_object_name) 924 obj_metadata = self.Upload(self.mem_thru_file_name, 925 self.mem_thru_object_name, use_file) 926 file_names = None 927 object_names = [self.mem_thru_object_name] * self.num_objects 928 serialization_data = ( 929 [GetDownloadSerializationData(obj_metadata)] * self.num_objects) 930 931 # Warmup the TCP connection. 932 warmup_obj_name = os.path.basename(self.tcp_warmup_file) 933 self.temporary_objects.add(warmup_obj_name) 934 self.Upload(self.tcp_warmup_file, warmup_obj_name) 935 self.Download(warmup_obj_name) 936 937 t0 = time.time() 938 if self.processes == 1 and self.threads == 1: 939 for i in range(self.num_objects): 940 file_name = file_names[i] if use_file else None 941 self.Download(object_names[i], file_name, serialization_data[i]) 942 else: 943 if self.parallel_strategy in (self.FAN, self.BOTH): 944 need_to_slice = (self.parallel_strategy == self.BOTH) 945 self.PerformFannedDownload(need_to_slice, object_names, file_names, 946 serialization_data) 947 elif self.parallel_strategy == self.SLICE: 948 for i in range(self.num_objects): 949 file_name = file_names[i] if use_file else None 950 self.PerformSlicedDownload( 951 object_names[i], file_name, serialization_data[i]) 952 t1 = time.time() 953 954 time_took = t1 - t0 955 total_bytes_copied = self.thru_filesize * self.num_objects 956 bytes_per_second = total_bytes_copied / time_took 957 958 self.results[test_name]['time_took'] = time_took 959 self.results[test_name]['total_bytes_copied'] = total_bytes_copied 960 self.results[test_name]['bytes_per_second'] = bytes_per_second 961 962 def _RunWriteThruTests(self, use_file=False): 963 """Runs write throughput tests.""" 964 test_name = 'write_throughput_file' if use_file else 'write_throughput' 965 file_io_string = 'with file I/O' if use_file else '' 966 self.logger.info( 967 '\nRunning write throughput tests %s (%s objects of size %s)' % 968 (file_io_string, self.num_objects, 969 MakeHumanReadable(self.thru_filesize))) 970 self._WarnIfLargeData() 971 972 self.results[test_name] = {'file_size': self.thru_filesize, 973 'processes': self.processes, 974 'threads': self.threads, 975 'parallelism': self.parallel_strategy} 976 977 # Warmup the TCP connection. 978 warmup_obj_name = os.path.basename(self.tcp_warmup_file) 979 self.temporary_objects.add(warmup_obj_name) 980 self.Upload(self.tcp_warmup_file, warmup_obj_name) 981 982 if use_file: 983 # For test with file I/O use N files on disk to preserve seek performance. 984 file_names = self.thru_file_names 985 object_names = self.thru_object_names 986 else: 987 # For in-memory test only use one file but copy it num_objects times, to 988 # allow for scalability in num_objects. 989 file_names = [self.mem_thru_file_name] * self.num_objects 990 object_names = ( 991 [self.mem_thru_object_name + str(i) for i in range(self.num_objects)]) 992 993 for object_name in object_names: 994 self.temporary_objects.add(object_name) 995 996 t0 = time.time() 997 if self.processes == 1 and self.threads == 1: 998 for i in range(self.num_objects): 999 self.Upload(file_names[i], object_names[i], use_file) 1000 else: 1001 if self.parallel_strategy in (self.FAN, self.BOTH): 1002 need_to_slice = (self.parallel_strategy == self.BOTH) 1003 self.PerformFannedUpload(need_to_slice, file_names, object_names, 1004 use_file) 1005 elif self.parallel_strategy == self.SLICE: 1006 for i in range(self.num_objects): 1007 self.PerformSlicedUpload(file_names[i], object_names[i], use_file) 1008 t1 = time.time() 1009 1010 time_took = t1 - t0 1011 total_bytes_copied = self.thru_filesize * self.num_objects 1012 bytes_per_second = total_bytes_copied / time_took 1013 1014 self.results[test_name]['time_took'] = time_took 1015 self.results[test_name]['total_bytes_copied'] = total_bytes_copied 1016 self.results[test_name]['bytes_per_second'] = bytes_per_second 1017 1018 def _RunListTests(self): 1019 """Runs eventual consistency listing latency tests.""" 1020 self.results['listing'] = {'num_files': self.num_objects} 1021 1022 # Generate N random objects to put into the bucket. 1023 list_prefix = 'gsutil-perfdiag-list-' 1024 list_fpaths = [] 1025 list_objects = [] 1026 args = [] 1027 for _ in xrange(self.num_objects): 1028 fpath = self._MakeTempFile(0, mem_data=True, mem_metadata=True, 1029 prefix=list_prefix) 1030 list_fpaths.append(fpath) 1031 object_name = os.path.basename(fpath) 1032 list_objects.append(object_name) 1033 args.append(FanUploadTuple(False, fpath, object_name, False)) 1034 self.temporary_objects.add(object_name) 1035 1036 # Add the objects to the bucket. 1037 self.logger.info( 1038 '\nWriting %s objects for listing test...', self.num_objects) 1039 1040 self.Apply(_UploadObject, args, _PerfdiagExceptionHandler, 1041 arg_checker=DummyArgChecker) 1042 1043 list_latencies = [] 1044 files_seen = [] 1045 total_start_time = time.time() 1046 expected_objects = set(list_objects) 1047 found_objects = set() 1048 1049 def _List(): 1050 """Lists and returns objects in the bucket. Also records latency.""" 1051 t0 = time.time() 1052 objects = list(self.gsutil_api.ListObjects( 1053 self.bucket_url.bucket_name, delimiter='/', 1054 provider=self.provider, fields=['items/name'])) 1055 t1 = time.time() 1056 list_latencies.append(t1 - t0) 1057 return set([obj.data.name for obj in objects]) 1058 1059 self.logger.info( 1060 'Listing bucket %s waiting for %s objects to appear...', 1061 self.bucket_url.bucket_name, self.num_objects) 1062 while expected_objects - found_objects: 1063 def _ListAfterUpload(): 1064 names = _List() 1065 found_objects.update(names & expected_objects) 1066 files_seen.append(len(found_objects)) 1067 self._RunOperation(_ListAfterUpload) 1068 if expected_objects - found_objects: 1069 if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME: 1070 self.logger.warning('Maximum time reached waiting for listing.') 1071 break 1072 total_end_time = time.time() 1073 1074 self.results['listing']['insert'] = { 1075 'num_listing_calls': len(list_latencies), 1076 'list_latencies': list_latencies, 1077 'files_seen_after_listing': files_seen, 1078 'time_took': total_end_time - total_start_time, 1079 } 1080 1081 args = [object_name for object_name in list_objects] 1082 self.logger.info( 1083 'Deleting %s objects for listing test...', self.num_objects) 1084 self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler, 1085 arg_checker=DummyArgChecker) 1086 1087 self.logger.info( 1088 'Listing bucket %s waiting for %s objects to disappear...', 1089 self.bucket_url.bucket_name, self.num_objects) 1090 list_latencies = [] 1091 files_seen = [] 1092 total_start_time = time.time() 1093 found_objects = set(list_objects) 1094 while found_objects: 1095 def _ListAfterDelete(): 1096 names = _List() 1097 found_objects.intersection_update(names) 1098 files_seen.append(len(found_objects)) 1099 self._RunOperation(_ListAfterDelete) 1100 if found_objects: 1101 if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME: 1102 self.logger.warning('Maximum time reached waiting for listing.') 1103 break 1104 total_end_time = time.time() 1105 1106 self.results['listing']['delete'] = { 1107 'num_listing_calls': len(list_latencies), 1108 'list_latencies': list_latencies, 1109 'files_seen_after_listing': files_seen, 1110 'time_took': total_end_time - total_start_time, 1111 } 1112 1113 def Upload(self, file_name, object_name, use_file=False, file_start=0, 1114 file_size=None): 1115 """Performs an upload to the test bucket. 1116 1117 The file is uploaded to the bucket referred to by self.bucket_url, and has 1118 name object_name. 1119 1120 Args: 1121 file_name: The path to the local file, and the key to its entry in 1122 temp_file_dict. 1123 object_name: The name of the remote object. 1124 use_file: If true, use disk I/O, otherwise read everything from memory. 1125 file_start: The first byte in the file to upload to the object. 1126 (only should be specified for sliced uploads) 1127 file_size: The size of the file to upload. 1128 (only should be specified for sliced uploads) 1129 1130 Returns: 1131 Uploaded Object Metadata. 1132 """ 1133 fp = None 1134 if file_size is None: 1135 file_size = temp_file_dict[file_name].size 1136 1137 upload_url = self.bucket_url.Clone() 1138 upload_url.object_name = object_name 1139 upload_target = StorageUrlToUploadObjectMetadata(upload_url) 1140 1141 try: 1142 if use_file: 1143 fp = FilePart(file_name, file_start, file_size) 1144 else: 1145 data = temp_file_dict[file_name].data[file_start:file_start+file_size] 1146 fp = cStringIO.StringIO(data) 1147 1148 def _InnerUpload(): 1149 if file_size < ResumableThreshold(): 1150 return self.gsutil_api.UploadObject( 1151 fp, upload_target, provider=self.provider, size=file_size, 1152 fields=['name', 'mediaLink', 'size']) 1153 else: 1154 return self.gsutil_api.UploadObjectResumable( 1155 fp, upload_target, provider=self.provider, size=file_size, 1156 fields=['name', 'mediaLink', 'size'], 1157 tracker_callback=_DummyTrackerCallback) 1158 return self._RunOperation(_InnerUpload) 1159 finally: 1160 if fp: 1161 fp.close() 1162 1163 def Download(self, object_name, file_name=None, serialization_data=None, 1164 start_byte=0, end_byte=None): 1165 """Downloads an object from the test bucket. 1166 1167 Args: 1168 object_name: The name of the object (in the test bucket) to download. 1169 file_name: Optional file name to write downloaded data to. If None, 1170 downloaded data is discarded immediately. 1171 serialization_data: Optional serialization data, used so that we don't 1172 have to get the metadata before downloading. 1173 start_byte: The first byte in the object to download. 1174 (only should be specified for sliced downloads) 1175 end_byte: The last byte in the object to download. 1176 (only should be specified for sliced downloads) 1177 """ 1178 fp = None 1179 try: 1180 if file_name is not None: 1181 fp = open(file_name, 'r+b') 1182 fp.seek(start_byte) 1183 else: 1184 fp = self.discard_sink 1185 1186 def _InnerDownload(): 1187 self.gsutil_api.GetObjectMedia( 1188 self.bucket_url.bucket_name, object_name, fp, 1189 provider=self.provider, start_byte=start_byte, end_byte=end_byte, 1190 serialization_data=serialization_data) 1191 self._RunOperation(_InnerDownload) 1192 finally: 1193 if fp: 1194 fp.close() 1195 1196 def Delete(self, object_name): 1197 """Deletes an object from the test bucket. 1198 1199 Args: 1200 object_name: The name of the object to delete. 1201 """ 1202 try: 1203 def _InnerDelete(): 1204 self.gsutil_api.DeleteObject(self.bucket_url.bucket_name, object_name, 1205 provider=self.provider) 1206 self._RunOperation(_InnerDelete) 1207 except NotFoundException: 1208 pass 1209 1210 def _GetDiskCounters(self): 1211 """Retrieves disk I/O statistics for all disks. 1212 1213 Adapted from the psutil module's psutil._pslinux.disk_io_counters: 1214 http://code.google.com/p/psutil/source/browse/trunk/psutil/_pslinux.py 1215 1216 Originally distributed under under a BSD license. 1217 Original Copyright (c) 2009, Jay Loden, Dave Daeschler, Giampaolo Rodola. 1218 1219 Returns: 1220 A dictionary containing disk names mapped to the disk counters from 1221 /disk/diskstats. 1222 """ 1223 # iostat documentation states that sectors are equivalent with blocks and 1224 # have a size of 512 bytes since 2.4 kernels. This value is needed to 1225 # calculate the amount of disk I/O in bytes. 1226 sector_size = 512 1227 1228 partitions = [] 1229 with open('/proc/partitions', 'r') as f: 1230 lines = f.readlines()[2:] 1231 for line in lines: 1232 _, _, _, name = line.split() 1233 if name[-1].isdigit(): 1234 partitions.append(name) 1235 1236 retdict = {} 1237 with open('/proc/diskstats', 'r') as f: 1238 for line in f: 1239 values = line.split()[:11] 1240 _, _, name, reads, _, rbytes, rtime, writes, _, wbytes, wtime = values 1241 if name in partitions: 1242 rbytes = int(rbytes) * sector_size 1243 wbytes = int(wbytes) * sector_size 1244 reads = int(reads) 1245 writes = int(writes) 1246 rtime = int(rtime) 1247 wtime = int(wtime) 1248 retdict[name] = (reads, writes, rbytes, wbytes, rtime, wtime) 1249 return retdict 1250 1251 def _GetTcpStats(self): 1252 """Tries to parse out TCP packet information from netstat output. 1253 1254 Returns: 1255 A dictionary containing TCP information, or None if netstat is not 1256 available. 1257 """ 1258 # netstat return code is non-zero for -s on Linux, so don't raise on error. 1259 try: 1260 netstat_output = self._Exec(['netstat', '-s'], return_output=True, 1261 raise_on_error=False) 1262 except OSError: 1263 self.logger.warning('netstat not found on your system; some measurement ' 1264 'data will be missing') 1265 return None 1266 netstat_output = netstat_output.strip().lower() 1267 found_tcp = False 1268 tcp_retransmit = None 1269 tcp_received = None 1270 tcp_sent = None 1271 for line in netstat_output.split('\n'): 1272 # Header for TCP section is "Tcp:" in Linux/Mac and 1273 # "TCP Statistics for" in Windows. 1274 if 'tcp:' in line or 'tcp statistics' in line: 1275 found_tcp = True 1276 1277 # Linux == "segments retransmited" (sic), Mac == "retransmit timeouts" 1278 # Windows == "segments retransmitted". 1279 if (found_tcp and tcp_retransmit is None and 1280 ('segments retransmited' in line or 'retransmit timeouts' in line or 1281 'segments retransmitted' in line)): 1282 tcp_retransmit = ''.join(c for c in line if c in string.digits) 1283 1284 # Linux+Windows == "segments received", Mac == "packets received". 1285 if (found_tcp and tcp_received is None and 1286 ('segments received' in line or 'packets received' in line)): 1287 tcp_received = ''.join(c for c in line if c in string.digits) 1288 1289 # Linux == "segments send out" (sic), Mac+Windows == "packets sent". 1290 if (found_tcp and tcp_sent is None and 1291 ('segments send out' in line or 'packets sent' in line or 1292 'segments sent' in line)): 1293 tcp_sent = ''.join(c for c in line if c in string.digits) 1294 1295 result = {} 1296 try: 1297 result['tcp_retransmit'] = int(tcp_retransmit) 1298 result['tcp_received'] = int(tcp_received) 1299 result['tcp_sent'] = int(tcp_sent) 1300 except (ValueError, TypeError): 1301 result['tcp_retransmit'] = None 1302 result['tcp_received'] = None 1303 result['tcp_sent'] = None 1304 1305 return result 1306 1307 def _CollectSysInfo(self): 1308 """Collects system information.""" 1309 sysinfo = {} 1310 1311 # All exceptions that might be raised from socket module calls. 1312 socket_errors = ( 1313 socket.error, socket.herror, socket.gaierror, socket.timeout) 1314 1315 # Find out whether HTTPS is enabled in Boto. 1316 sysinfo['boto_https_enabled'] = boto.config.get('Boto', 'is_secure', True) 1317 1318 # Look up proxy info. 1319 proxy_host = boto.config.get('Boto', 'proxy', None) 1320 proxy_port = boto.config.getint('Boto', 'proxy_port', 0) 1321 sysinfo['using_proxy'] = bool(proxy_host) 1322 1323 if boto.config.get('Boto', 'proxy_rdns', False): 1324 self.logger.info('DNS lookups are disallowed in this environment, so ' 1325 'some information is not included in this perfdiag run.') 1326 1327 # Get the local IP address from socket lib. 1328 try: 1329 sysinfo['ip_address'] = socket.gethostbyname(socket.gethostname()) 1330 except socket_errors: 1331 sysinfo['ip_address'] = '' 1332 # Record the temporary directory used since it can affect performance, e.g. 1333 # when on a networked filesystem. 1334 sysinfo['tempdir'] = self.directory 1335 1336 # Produces an RFC 2822 compliant GMT timestamp. 1337 sysinfo['gmt_timestamp'] = time.strftime('%a, %d %b %Y %H:%M:%S +0000', 1338 time.gmtime()) 1339 1340 # Execute a CNAME lookup on Google DNS to find what Google server 1341 # it's routing to. 1342 cmd = ['nslookup', '-type=CNAME', self.XML_API_HOST] 1343 try: 1344 nslookup_cname_output = self._Exec(cmd, return_output=True) 1345 m = re.search(r' = (?P<googserv>[^.]+)\.', nslookup_cname_output) 1346 sysinfo['googserv_route'] = m.group('googserv') if m else None 1347 except (CommandException, OSError): 1348 sysinfo['googserv_route'] = '' 1349 1350 # Try to determine the latency of a DNS lookup for the Google hostname 1351 # endpoint. Note: we don't piggyback on gethostbyname_ex below because 1352 # the _ex version requires an extra RTT. 1353 try: 1354 t0 = time.time() 1355 socket.gethostbyname(self.XML_API_HOST) 1356 t1 = time.time() 1357 sysinfo['google_host_dns_latency'] = t1 - t0 1358 except socket_errors: 1359 pass 1360 1361 # Look up IP addresses for Google Server. 1362 try: 1363 (hostname, _, ipaddrlist) = socket.gethostbyname_ex(self.XML_API_HOST) 1364 sysinfo['googserv_ips'] = ipaddrlist 1365 except socket_errors: 1366 ipaddrlist = [] 1367 sysinfo['googserv_ips'] = [] 1368 1369 # Reverse lookup the hostnames for the Google Server IPs. 1370 sysinfo['googserv_hostnames'] = [] 1371 for googserv_ip in ipaddrlist: 1372 try: 1373 (hostname, _, ipaddrlist) = socket.gethostbyaddr(googserv_ip) 1374 sysinfo['googserv_hostnames'].append(hostname) 1375 except socket_errors: 1376 pass 1377 1378 # Query o-o to find out what the Google DNS thinks is the user's IP. 1379 try: 1380 cmd = ['nslookup', '-type=TXT', 'o-o.myaddr.google.com.'] 1381 nslookup_txt_output = self._Exec(cmd, return_output=True) 1382 m = re.search(r'text\s+=\s+"(?P<dnsip>[\.\d]+)"', nslookup_txt_output) 1383 sysinfo['dns_o-o_ip'] = m.group('dnsip') if m else None 1384 except (CommandException, OSError): 1385 sysinfo['dns_o-o_ip'] = '' 1386 1387 # Try to determine the latency of connecting to the Google hostname 1388 # endpoint. 1389 sysinfo['google_host_connect_latencies'] = {} 1390 for googserv_ip in ipaddrlist: 1391 try: 1392 sock = socket.socket() 1393 t0 = time.time() 1394 sock.connect((googserv_ip, self.XML_API_PORT)) 1395 t1 = time.time() 1396 sysinfo['google_host_connect_latencies'][googserv_ip] = t1 - t0 1397 except socket_errors: 1398 pass 1399 1400 # If using a proxy, try to determine the latency of a DNS lookup to resolve 1401 # the proxy hostname and the latency of connecting to the proxy. 1402 if proxy_host: 1403 proxy_ip = None 1404 try: 1405 t0 = time.time() 1406 proxy_ip = socket.gethostbyname(proxy_host) 1407 t1 = time.time() 1408 sysinfo['proxy_dns_latency'] = t1 - t0 1409 except socket_errors: 1410 pass 1411 1412 try: 1413 sock = socket.socket() 1414 t0 = time.time() 1415 sock.connect((proxy_ip or proxy_host, proxy_port)) 1416 t1 = time.time() 1417 sysinfo['proxy_host_connect_latency'] = t1 - t0 1418 except socket_errors: 1419 pass 1420 1421 # Try and find the number of CPUs in the system if available. 1422 try: 1423 sysinfo['cpu_count'] = multiprocessing.cpu_count() 1424 except NotImplementedError: 1425 sysinfo['cpu_count'] = None 1426 1427 # For *nix platforms, obtain the CPU load. 1428 try: 1429 sysinfo['load_avg'] = list(os.getloadavg()) 1430 except (AttributeError, OSError): 1431 sysinfo['load_avg'] = None 1432 1433 # Try and collect memory information from /proc/meminfo if possible. 1434 mem_total = None 1435 mem_free = None 1436 mem_buffers = None 1437 mem_cached = None 1438 1439 try: 1440 with open('/proc/meminfo', 'r') as f: 1441 for line in f: 1442 if line.startswith('MemTotal'): 1443 mem_total = (int(''.join(c for c in line if c in string.digits)) 1444 * 1000) 1445 elif line.startswith('MemFree'): 1446 mem_free = (int(''.join(c for c in line if c in string.digits)) 1447 * 1000) 1448 elif line.startswith('Buffers'): 1449 mem_buffers = (int(''.join(c for c in line if c in string.digits)) 1450 * 1000) 1451 elif line.startswith('Cached'): 1452 mem_cached = (int(''.join(c for c in line if c in string.digits)) 1453 * 1000) 1454 except (IOError, ValueError): 1455 pass 1456 1457 sysinfo['meminfo'] = {'mem_total': mem_total, 1458 'mem_free': mem_free, 1459 'mem_buffers': mem_buffers, 1460 'mem_cached': mem_cached} 1461 1462 # Get configuration attributes from config module. 1463 sysinfo['gsutil_config'] = {} 1464 for attr in dir(config): 1465 attr_value = getattr(config, attr) 1466 # Filter out multiline strings that are not useful. 1467 if attr.isupper() and not (isinstance(attr_value, basestring) and 1468 '\n' in attr_value): 1469 sysinfo['gsutil_config'][attr] = attr_value 1470 1471 sysinfo['tcp_proc_values'] = {} 1472 stats_to_check = [ 1473 '/proc/sys/net/core/rmem_default', 1474 '/proc/sys/net/core/rmem_max', 1475 '/proc/sys/net/core/wmem_default', 1476 '/proc/sys/net/core/wmem_max', 1477 '/proc/sys/net/ipv4/tcp_timestamps', 1478 '/proc/sys/net/ipv4/tcp_sack', 1479 '/proc/sys/net/ipv4/tcp_window_scaling', 1480 ] 1481 for fname in stats_to_check: 1482 try: 1483 with open(fname, 'r') as f: 1484 value = f.read() 1485 sysinfo['tcp_proc_values'][os.path.basename(fname)] = value.strip() 1486 except IOError: 1487 pass 1488 1489 self.results['sysinfo'] = sysinfo 1490 1491 def _DisplayStats(self, trials): 1492 """Prints out mean, standard deviation, median, and 90th percentile.""" 1493 n = len(trials) 1494 mean = float(sum(trials)) / n 1495 stdev = math.sqrt(sum((x - mean)**2 for x in trials) / n) 1496 1497 print str(n).rjust(6), '', 1498 print ('%.1f' % (mean * 1000)).rjust(9), '', 1499 print ('%.1f' % (stdev * 1000)).rjust(12), '', 1500 print ('%.1f' % (Percentile(trials, 0.5) * 1000)).rjust(11), '', 1501 print ('%.1f' % (Percentile(trials, 0.9) * 1000)).rjust(11), '' 1502 1503 def _DisplayResults(self): 1504 """Displays results collected from diagnostic run.""" 1505 print 1506 print '=' * 78 1507 print 'DIAGNOSTIC RESULTS'.center(78) 1508 print '=' * 78 1509 1510 if 'latency' in self.results: 1511 print 1512 print '-' * 78 1513 print 'Latency'.center(78) 1514 print '-' * 78 1515 print ('Operation Size Trials Mean (ms) Std Dev (ms) ' 1516 'Median (ms) 90th % (ms)') 1517 print ('========= ========= ====== ========= ============ ' 1518 '=========== ===========') 1519 for key in sorted(self.results['latency']): 1520 trials = sorted(self.results['latency'][key]) 1521 op, numbytes = key.split('_') 1522 numbytes = int(numbytes) 1523 if op == 'METADATA': 1524 print 'Metadata'.rjust(9), '', 1525 print MakeHumanReadable(numbytes).rjust(9), '', 1526 self._DisplayStats(trials) 1527 if op == 'DOWNLOAD': 1528 print 'Download'.rjust(9), '', 1529 print MakeHumanReadable(numbytes).rjust(9), '', 1530 self._DisplayStats(trials) 1531 if op == 'UPLOAD': 1532 print 'Upload'.rjust(9), '', 1533 print MakeHumanReadable(numbytes).rjust(9), '', 1534 self._DisplayStats(trials) 1535 if op == 'DELETE': 1536 print 'Delete'.rjust(9), '', 1537 print MakeHumanReadable(numbytes).rjust(9), '', 1538 self._DisplayStats(trials) 1539 1540 if 'write_throughput' in self.results: 1541 print 1542 print '-' * 78 1543 print 'Write Throughput'.center(78) 1544 print '-' * 78 1545 write_thru = self.results['write_throughput'] 1546 print 'Copied %s %s file(s) for a total transfer size of %s.' % ( 1547 self.num_objects, 1548 MakeHumanReadable(write_thru['file_size']), 1549 MakeHumanReadable(write_thru['total_bytes_copied'])) 1550 print 'Write throughput: %s/s.' % ( 1551 MakeBitsHumanReadable(write_thru['bytes_per_second'] * 8)) 1552 print 'Parallelism strategy: %s' % write_thru['parallelism'] 1553 1554 if 'write_throughput_file' in self.results: 1555 print 1556 print '-' * 78 1557 print 'Write Throughput With File I/O'.center(78) 1558 print '-' * 78 1559 write_thru_file = self.results['write_throughput_file'] 1560 print 'Copied %s %s file(s) for a total transfer size of %s.' % ( 1561 self.num_objects, 1562 MakeHumanReadable(write_thru_file['file_size']), 1563 MakeHumanReadable(write_thru_file['total_bytes_copied'])) 1564 print 'Write throughput: %s/s.' % ( 1565 MakeBitsHumanReadable(write_thru_file['bytes_per_second'] * 8)) 1566 print 'Parallelism strategy: %s' % write_thru_file['parallelism'] 1567 1568 if 'read_throughput' in self.results: 1569 print 1570 print '-' * 78 1571 print 'Read Throughput'.center(78) 1572 print '-' * 78 1573 read_thru = self.results['read_throughput'] 1574 print 'Copied %s %s file(s) for a total transfer size of %s.' % ( 1575 self.num_objects, 1576 MakeHumanReadable(read_thru['file_size']), 1577 MakeHumanReadable(read_thru['total_bytes_copied'])) 1578 print 'Read throughput: %s/s.' % ( 1579 MakeBitsHumanReadable(read_thru['bytes_per_second'] * 8)) 1580 print 'Parallelism strategy: %s' % read_thru['parallelism'] 1581 1582 if 'read_throughput_file' in self.results: 1583 print 1584 print '-' * 78 1585 print 'Read Throughput With File I/O'.center(78) 1586 print '-' * 78 1587 read_thru_file = self.results['read_throughput_file'] 1588 print 'Copied %s %s file(s) for a total transfer size of %s.' % ( 1589 self.num_objects, 1590 MakeHumanReadable(read_thru_file['file_size']), 1591 MakeHumanReadable(read_thru_file['total_bytes_copied'])) 1592 print 'Read throughput: %s/s.' % ( 1593 MakeBitsHumanReadable(read_thru_file['bytes_per_second'] * 8)) 1594 print 'Parallelism strategy: %s' % read_thru_file['parallelism'] 1595 1596 if 'listing' in self.results: 1597 print 1598 print '-' * 78 1599 print 'Listing'.center(78) 1600 print '-' * 78 1601 1602 listing = self.results['listing'] 1603 insert = listing['insert'] 1604 delete = listing['delete'] 1605 print 'After inserting %s objects:' % listing['num_files'] 1606 print (' Total time for objects to appear: %.2g seconds' % 1607 insert['time_took']) 1608 print ' Number of listing calls made: %s' % insert['num_listing_calls'] 1609 print (' Individual listing call latencies: [%s]' % 1610 ', '.join('%.2gs' % lat for lat in insert['list_latencies'])) 1611 print (' Files reflected after each call: [%s]' % 1612 ', '.join(map(str, insert['files_seen_after_listing']))) 1613 1614 print 'After deleting %s objects:' % listing['num_files'] 1615 print (' Total time for objects to appear: %.2g seconds' % 1616 delete['time_took']) 1617 print ' Number of listing calls made: %s' % delete['num_listing_calls'] 1618 print (' Individual listing call latencies: [%s]' % 1619 ', '.join('%.2gs' % lat for lat in delete['list_latencies'])) 1620 print (' Files reflected after each call: [%s]' % 1621 ', '.join(map(str, delete['files_seen_after_listing']))) 1622 1623 if 'sysinfo' in self.results: 1624 print 1625 print '-' * 78 1626 print 'System Information'.center(78) 1627 print '-' * 78 1628 info = self.results['sysinfo'] 1629 print 'IP Address: \n %s' % info['ip_address'] 1630 print 'Temporary Directory: \n %s' % info['tempdir'] 1631 print 'Bucket URI: \n %s' % self.results['bucket_uri'] 1632 print 'gsutil Version: \n %s' % self.results.get('gsutil_version', 1633 'Unknown') 1634 print 'boto Version: \n %s' % self.results.get('boto_version', 'Unknown') 1635 1636 if 'gmt_timestamp' in info: 1637 ts_string = info['gmt_timestamp'] 1638 timetuple = None 1639 try: 1640 # Convert RFC 2822 string to Linux timestamp. 1641 timetuple = time.strptime(ts_string, '%a, %d %b %Y %H:%M:%S +0000') 1642 except ValueError: 1643 pass 1644 1645 if timetuple: 1646 # Converts the GMT time tuple to local Linux timestamp. 1647 localtime = calendar.timegm(timetuple) 1648 localdt = datetime.datetime.fromtimestamp(localtime) 1649 print 'Measurement time: \n %s' % localdt.strftime( 1650 '%Y-%m-%d %I:%M:%S %p %Z') 1651 1652 print 'Google Server: \n %s' % info['googserv_route'] 1653 print ('Google Server IP Addresses: \n %s' % 1654 ('\n '.join(info['googserv_ips']))) 1655 print ('Google Server Hostnames: \n %s' % 1656 ('\n '.join(info['googserv_hostnames']))) 1657 print 'Google DNS thinks your IP is: \n %s' % info['dns_o-o_ip'] 1658 print 'CPU Count: \n %s' % info['cpu_count'] 1659 print 'CPU Load Average: \n %s' % info['load_avg'] 1660 try: 1661 print ('Total Memory: \n %s' % 1662 MakeHumanReadable(info['meminfo']['mem_total'])) 1663 # Free memory is really MemFree + Buffers + Cached. 1664 print 'Free Memory: \n %s' % MakeHumanReadable( 1665 info['meminfo']['mem_free'] + 1666 info['meminfo']['mem_buffers'] + 1667 info['meminfo']['mem_cached']) 1668 except TypeError: 1669 pass 1670 1671 if 'netstat_end' in info and 'netstat_start' in info: 1672 netstat_after = info['netstat_end'] 1673 netstat_before = info['netstat_start'] 1674 for tcp_type in ('sent', 'received', 'retransmit'): 1675 try: 1676 delta = (netstat_after['tcp_%s' % tcp_type] - 1677 netstat_before['tcp_%s' % tcp_type]) 1678 print 'TCP segments %s during test:\n %d' % (tcp_type, delta) 1679 except TypeError: 1680 pass 1681 else: 1682 print ('TCP segment counts not available because "netstat" was not ' 1683 'found during test runs') 1684 1685 if 'disk_counters_end' in info and 'disk_counters_start' in info: 1686 print 'Disk Counter Deltas:\n', 1687 disk_after = info['disk_counters_end'] 1688 disk_before = info['disk_counters_start'] 1689 print '', 'disk'.rjust(6), 1690 for colname in ['reads', 'writes', 'rbytes', 'wbytes', 'rtime', 1691 'wtime']: 1692 print colname.rjust(8), 1693 print 1694 for diskname in sorted(disk_after): 1695 before = disk_before[diskname] 1696 after = disk_after[diskname] 1697 (reads1, writes1, rbytes1, wbytes1, rtime1, wtime1) = before 1698 (reads2, writes2, rbytes2, wbytes2, rtime2, wtime2) = after 1699 print '', diskname.rjust(6), 1700 deltas = [reads2-reads1, writes2-writes1, rbytes2-rbytes1, 1701 wbytes2-wbytes1, rtime2-rtime1, wtime2-wtime1] 1702 for delta in deltas: 1703 print str(delta).rjust(8), 1704 print 1705 1706 if 'tcp_proc_values' in info: 1707 print 'TCP /proc values:\n', 1708 for item in info['tcp_proc_values'].iteritems(): 1709 print ' %s = %s' % item 1710 1711 if 'boto_https_enabled' in info: 1712 print 'Boto HTTPS Enabled: \n %s' % info['boto_https_enabled'] 1713 1714 if 'using_proxy' in info: 1715 print 'Requests routed through proxy: \n %s' % info['using_proxy'] 1716 1717 if 'google_host_dns_latency' in info: 1718 print ('Latency of the DNS lookup for Google Storage server (ms): ' 1719 '\n %.1f' % (info['google_host_dns_latency'] * 1000.0)) 1720 1721 if 'google_host_connect_latencies' in info: 1722 print 'Latencies connecting to Google Storage server IPs (ms):' 1723 for ip, latency in info['google_host_connect_latencies'].iteritems(): 1724 print ' %s = %.1f' % (ip, latency * 1000.0) 1725 1726 if 'proxy_dns_latency' in info: 1727 print ('Latency of the DNS lookup for the configured proxy (ms): ' 1728 '\n %.1f' % (info['proxy_dns_latency'] * 1000.0)) 1729 1730 if 'proxy_host_connect_latency' in info: 1731 print ('Latency connecting to the configured proxy (ms): \n %.1f' % 1732 (info['proxy_host_connect_latency'] * 1000.0)) 1733 1734 if 'request_errors' in self.results and 'total_requests' in self.results: 1735 print 1736 print '-' * 78 1737 print 'In-Process HTTP Statistics'.center(78) 1738 print '-' * 78 1739 total = int(self.results['total_requests']) 1740 numerrors = int(self.results['request_errors']) 1741 numbreaks = int(self.results['connection_breaks']) 1742 availability = (((total - numerrors) / float(total)) * 100 1743 if total > 0 else 100) 1744 print 'Total HTTP requests made: %d' % total 1745 print 'HTTP 5xx errors: %d' % numerrors 1746 print 'HTTP connections broken: %d' % numbreaks 1747 print 'Availability: %.7g%%' % availability 1748 if 'error_responses_by_code' in self.results: 1749 sorted_codes = sorted( 1750 self.results['error_responses_by_code'].iteritems()) 1751 if sorted_codes: 1752 print 'Error responses by code:' 1753 print '\n'.join(' %s: %s' % c for c in sorted_codes) 1754 1755 if self.output_file: 1756 with open(self.output_file, 'w') as f: 1757 json.dump(self.results, f, indent=2) 1758 print 1759 print "Output file written to '%s'." % self.output_file 1760 1761 print 1762 1763 def _ParsePositiveInteger(self, val, msg): 1764 """Tries to convert val argument to a positive integer. 1765 1766 Args: 1767 val: The value (as a string) to convert to a positive integer. 1768 msg: The error message to place in the CommandException on an error. 1769 1770 Returns: 1771 A valid positive integer. 1772 1773 Raises: 1774 CommandException: If the supplied value is not a valid positive integer. 1775 """ 1776 try: 1777 val = int(val) 1778 if val < 1: 1779 raise CommandException(msg) 1780 return val 1781 except ValueError: 1782 raise CommandException(msg) 1783 1784 def _ParseArgs(self): 1785 """Parses arguments for perfdiag command.""" 1786 # From -n. 1787 self.num_objects = 5 1788 # From -c. 1789 self.processes = 1 1790 # From -k. 1791 self.threads = 1 1792 # From -p 1793 self.parallel_strategy = None 1794 # From -y 1795 self.num_slices = 4 1796 # From -s. 1797 self.thru_filesize = 1048576 1798 # From -d. 1799 self.directory = tempfile.gettempdir() 1800 # Keep track of whether or not to delete the directory upon completion. 1801 self.delete_directory = False 1802 # From -t. 1803 self.diag_tests = set(self.DEFAULT_DIAG_TESTS) 1804 # From -o. 1805 self.output_file = None 1806 # From -i. 1807 self.input_file = None 1808 # From -m. 1809 self.metadata_keys = {} 1810 1811 if self.sub_opts: 1812 for o, a in self.sub_opts: 1813 if o == '-n': 1814 self.num_objects = self._ParsePositiveInteger( 1815 a, 'The -n parameter must be a positive integer.') 1816 if o == '-c': 1817 self.processes = self._ParsePositiveInteger( 1818 a, 'The -c parameter must be a positive integer.') 1819 if o == '-k': 1820 self.threads = self._ParsePositiveInteger( 1821 a, 'The -k parameter must be a positive integer.') 1822 if o == '-p': 1823 if a.lower() in self.PARALLEL_STRATEGIES: 1824 self.parallel_strategy = a.lower() 1825 else: 1826 raise CommandException( 1827 "'%s' is not a valid parallelism strategy." % a) 1828 if o == '-y': 1829 self.num_slices = self._ParsePositiveInteger( 1830 a, 'The -y parameter must be a positive integer.') 1831 if o == '-s': 1832 try: 1833 self.thru_filesize = HumanReadableToBytes(a) 1834 except ValueError: 1835 raise CommandException('Invalid -s parameter.') 1836 if o == '-d': 1837 self.directory = a 1838 if not os.path.exists(self.directory): 1839 self.delete_directory = True 1840 os.makedirs(self.directory) 1841 if o == '-t': 1842 self.diag_tests = set() 1843 for test_name in a.strip().split(','): 1844 if test_name.lower() not in self.ALL_DIAG_TESTS: 1845 raise CommandException("List of test names (-t) contains invalid " 1846 "test name '%s'." % test_name) 1847 self.diag_tests.add(test_name) 1848 if o == '-m': 1849 pieces = a.split(':') 1850 if len(pieces) != 2: 1851 raise CommandException( 1852 "Invalid metadata key-value combination '%s'." % a) 1853 key, value = pieces 1854 self.metadata_keys[key] = value 1855 if o == '-o': 1856 self.output_file = os.path.abspath(a) 1857 if o == '-i': 1858 self.input_file = os.path.abspath(a) 1859 if not os.path.isfile(self.input_file): 1860 raise CommandException("Invalid input file (-i): '%s'." % a) 1861 try: 1862 with open(self.input_file, 'r') as f: 1863 self.results = json.load(f) 1864 self.logger.info("Read input file: '%s'.", self.input_file) 1865 except ValueError: 1866 raise CommandException("Could not decode input file (-i): '%s'." % 1867 a) 1868 return 1869 1870 # If parallelism is specified, default parallelism strategy to fan. 1871 if (self.processes > 1 or self.threads > 1) and not self.parallel_strategy: 1872 self.parallel_strategy = self.FAN 1873 elif self.processes == 1 and self.threads == 1 and self.parallel_strategy: 1874 raise CommandException( 1875 'Cannot specify parallelism strategy (-p) without also specifying ' 1876 'multiple threads and/or processes (-c and/or -k).') 1877 1878 if not self.args: 1879 self.RaiseWrongNumberOfArgumentsException() 1880 1881 self.bucket_url = StorageUrlFromString(self.args[0]) 1882 self.provider = self.bucket_url.scheme 1883 if not self.bucket_url.IsCloudUrl() and self.bucket_url.IsBucket(): 1884 raise CommandException('The perfdiag command requires a URL that ' 1885 'specifies a bucket.\n"%s" is not ' 1886 'valid.' % self.args[0]) 1887 1888 if (self.thru_filesize > HumanReadableToBytes('2GiB') and 1889 (self.RTHRU in self.diag_tests or self.WTHRU in self.diag_tests)): 1890 raise CommandException( 1891 'For in-memory tests maximum file size is 2GiB. For larger file ' 1892 'sizes, specify rthru_file and/or wthru_file with the -t option.') 1893 1894 perform_slice = self.parallel_strategy in (self.SLICE, self.BOTH) 1895 slice_not_available = ( 1896 self.provider == 's3' and self.diag_tests.intersection(self.WTHRU, 1897 self.WTHRU_FILE)) 1898 if perform_slice and slice_not_available: 1899 raise CommandException('Sliced uploads are not available for s3. ' 1900 'Use -p fan or sequential uploads for s3.') 1901 1902 # Ensure the bucket exists. 1903 self.gsutil_api.GetBucket(self.bucket_url.bucket_name, 1904 provider=self.bucket_url.scheme, 1905 fields=['id']) 1906 self.exceptions = [httplib.HTTPException, socket.error, socket.gaierror, 1907 socket.timeout, httplib.BadStatusLine, 1908 ServiceException] 1909 1910 # Command entry point. 1911 def RunCommand(self): 1912 """Called by gsutil when the command is being invoked.""" 1913 self._ParseArgs() 1914 1915 if self.input_file: 1916 self._DisplayResults() 1917 return 0 1918 1919 # We turn off retries in the underlying boto library because the 1920 # _RunOperation function handles errors manually so it can count them. 1921 boto.config.set('Boto', 'num_retries', '0') 1922 1923 self.logger.info( 1924 'Number of iterations to run: %d\n' 1925 'Base bucket URI: %s\n' 1926 'Number of processes: %d\n' 1927 'Number of threads: %d\n' 1928 'Parallelism strategy: %s\n' 1929 'Throughput file size: %s\n' 1930 'Diagnostics to run: %s', 1931 self.num_objects, 1932 self.bucket_url, 1933 self.processes, 1934 self.threads, 1935 self.parallel_strategy, 1936 MakeHumanReadable(self.thru_filesize), 1937 (', '.join(self.diag_tests))) 1938 1939 try: 1940 self._SetUp() 1941 1942 # Collect generic system info. 1943 self._CollectSysInfo() 1944 # Collect netstat info and disk counters before tests (and again later). 1945 netstat_output = self._GetTcpStats() 1946 if netstat_output: 1947 self.results['sysinfo']['netstat_start'] = netstat_output 1948 if IS_LINUX: 1949 self.results['sysinfo']['disk_counters_start'] = self._GetDiskCounters() 1950 # Record bucket URL. 1951 self.results['bucket_uri'] = str(self.bucket_url) 1952 self.results['json_format'] = 'perfdiag' 1953 self.results['metadata'] = self.metadata_keys 1954 1955 if self.LAT in self.diag_tests: 1956 self._RunLatencyTests() 1957 if self.RTHRU in self.diag_tests: 1958 self._RunReadThruTests() 1959 # Run WTHRU_FILE before RTHRU_FILE. If data is created in WTHRU_FILE it 1960 # will be used in RTHRU_FILE to save time and bandwidth. 1961 if self.WTHRU_FILE in self.diag_tests: 1962 self._RunWriteThruTests(use_file=True) 1963 if self.RTHRU_FILE in self.diag_tests: 1964 self._RunReadThruTests(use_file=True) 1965 if self.WTHRU in self.diag_tests: 1966 self._RunWriteThruTests() 1967 if self.LIST in self.diag_tests: 1968 self._RunListTests() 1969 1970 # Collect netstat info and disk counters after tests. 1971 netstat_output = self._GetTcpStats() 1972 if netstat_output: 1973 self.results['sysinfo']['netstat_end'] = netstat_output 1974 if IS_LINUX: 1975 self.results['sysinfo']['disk_counters_end'] = self._GetDiskCounters() 1976 1977 self.results['total_requests'] = self.total_requests 1978 self.results['request_errors'] = self.request_errors 1979 self.results['error_responses_by_code'] = self.error_responses_by_code 1980 self.results['connection_breaks'] = self.connection_breaks 1981 self.results['gsutil_version'] = gslib.VERSION 1982 self.results['boto_version'] = boto.__version__ 1983 1984 self._TearDown() 1985 self._DisplayResults() 1986 finally: 1987 # TODO: Install signal handlers so this is performed in response to a 1988 # terminating signal; consider multi-threaded object deletes during 1989 # cleanup so it happens quickly. 1990 self._TearDown() 1991 1992 return 0 1993 1994 1995 def StorageUrlToUploadObjectMetadata(storage_url): 1996 if storage_url.IsCloudUrl() and storage_url.IsObject(): 1997 upload_target = apitools_messages.Object() 1998 upload_target.name = storage_url.object_name 1999 upload_target.bucket = storage_url.bucket_name 2000 return upload_target 2001 else: 2002 raise CommandException('Non-cloud URL upload target %s was created in ' 2003 'perfdiag implemenation.' % storage_url) 2004